From 69e868d04712d5dca6990b8d5135994dd574e60e Mon Sep 17 00:00:00 2001 From: Wanda Date: Wed, 6 Mar 2024 05:23:47 +0100 Subject: [PATCH] Implement RFC 50: `Print` and string formatting. Co-authored-by: Catherine --- amaranth/__init__.py | 1 + amaranth/asserts.py | 13 +- amaranth/back/rtlil.py | 111 ++++++++++----- amaranth/hdl/__init__.py | 2 + amaranth/hdl/_ast.py | 290 +++++++++++++++++++++++++++++++++++---- amaranth/hdl/_dsl.py | 8 +- amaranth/hdl/_ir.py | 42 +++++- amaranth/hdl/_nir.py | 155 +++++++++++++++++++-- amaranth/hdl/_xfrm.py | 40 +++++- amaranth/lib/fifo.py | 3 +- amaranth/sim/_pyrtl.py | 88 ++++++++++-- docs/changes.rst | 8 ++ docs/guide.rst | 85 ++++++++++++ docs/reference.rst | 3 + tests/test_hdl_ast.py | 155 ++++++++++++++++++++- tests/test_hdl_dsl.py | 2 +- tests/test_hdl_ir.py | 89 ++++++++++-- tests/test_lib_coding.py | 1 - tests/test_lib_fifo.py | 2 +- tests/test_sim.py | 104 +++++++++++++- 20 files changed, 1090 insertions(+), 112 deletions(-) diff --git a/amaranth/__init__.py b/amaranth/__init__.py index 4094b99b5..9e60ff032 100644 --- a/amaranth/__init__.py +++ b/amaranth/__init__.py @@ -16,6 +16,7 @@ __all__ = [ "Shape", "unsigned", "signed", "Value", "Const", "C", "Mux", "Cat", "Array", "Signal", "ClockSignal", "ResetSignal", + "Format", "Print", "Assert", "Module", "ClockDomain", "Elaboratable", "Fragment", "Instance", diff --git a/amaranth/asserts.py b/amaranth/asserts.py index e7701ba37..054550868 100644 --- a/amaranth/asserts.py +++ b/amaranth/asserts.py @@ -1,4 +1,15 @@ -from .hdl._ast import AnyConst, AnySeq, Initial, Assert, Assume, Cover +from .hdl._ast import AnyConst, AnySeq, Initial +from . import hdl as __hdl __all__ = ["AnyConst", "AnySeq", "Initial", "Assert", "Assume", "Cover"] + + +def __getattr__(name): + import warnings + if name in __hdl.__dict__ and name in __all__: + if not (name.startswith("__") and name.endswith("__")): + warnings.warn(f"instead of `{__name__}.{name}`, use `{__hdl.__name__}.{name}`", + DeprecationWarning, stacklevel=2) + return getattr(__hdl, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") \ No newline at end of file diff --git a/amaranth/back/rtlil.py b/amaranth/back/rtlil.py index 37989bf0c..2b2a82305 100644 --- a/amaranth/back/rtlil.py +++ b/amaranth/back/rtlil.py @@ -441,8 +441,8 @@ def emit_cell_wires(self): continue # Instances use one wire per output, not per cell. elif isinstance(cell, (_nir.PriorityMatch, _nir.Matches)): continue # Inlined into assignment lists. - elif isinstance(cell, (_nir.SyncProperty, _nir.AsyncProperty, _nir.Memory, - _nir.SyncWritePort)): + elif isinstance(cell, (_nir.SyncPrint, _nir.AsyncPrint, _nir.SyncProperty, + _nir.AsyncProperty, _nir.Memory, _nir.SyncWritePort)): continue # No outputs. elif isinstance(cell, _nir.AssignmentList): width = len(cell.default) @@ -859,37 +859,78 @@ def emit_read_port(self, cell_idx, cell): }) self.builder.cell(f"$memrd_v2", ports=ports, params=params, src=_src(cell.src_loc)) - def emit_property(self, cell_idx, cell): - if isinstance(cell, _nir.AsyncProperty): - ports = { - "A": self.sigspec(cell.test), - "EN": self.sigspec(cell.en), - } - if isinstance(cell, _nir.SyncProperty): - test = self.builder.wire(1, attrs={"init": _ast.Const(0, 1)}) - en = self.builder.wire(1, attrs={"init": _ast.Const(0, 1)}) - for (d, q) in [ - (cell.test, test), - (cell.en, en), - ]: - ports = { - "D": self.sigspec(d), - "Q": q, - "CLK": self.sigspec(cell.clk), - } - params = { - "WIDTH": 1, - "CLK_POLARITY": { - "pos": True, - "neg": False, - }[cell.clk_edge], - } - self.builder.cell(f"$dff", ports=ports, params=params, src=_src(cell.src_loc)) - ports = { - "A": test, - "EN": en, - } - self.builder.cell(f"${cell.kind}", name=cell.name, ports=ports, src=_src(cell.src_loc)) + def emit_print(self, cell_idx, cell): + args = [] + format = [] + if cell.format is not None: + for chunk in cell.format.chunks: + if isinstance(chunk, str): + format.append(chunk) + else: + spec = _ast.Format._parse_format_spec(chunk.format_desc, _ast.Shape(len(chunk.value), chunk.signed)) + type = spec["type"] + if type == "s": + assert len(chunk.value) % 8 == 0 + for bit in reversed(range(0, len(chunk.value), 8)): + args += chunk.value[bit:bit+8] + else: + args += chunk.value + if type is None: + type = "d" + if type == "x" or type == "X": + # TODO(yosys): "H" type + type = "h" + if type == "s": + # TODO(yosys): support for single unicode character? + type = "c" + width = spec["width"] + align = spec["align"] + if align is None: + align = ">" if type != "c" else "<" + if align == "=": + # TODO(yosys): "=" alignment + align = ">" + fill = spec["fill"] + if fill not in (" ", "0"): + # TODO(yosys): arbitrary fill + fill = " " + # TODO(yosys): support for options, grouping + sign = spec["sign"] + if sign != "+": + # TODO(yosys): support " " sign + sign = "" + if type == "c": + signed = "" + elif chunk.signed: + signed = "s" + else: + signed = "u" + format.append(f"{{{len(chunk.value)}:{align}{fill}{width or ''}{type}{sign}{signed}}}") + ports = { + "EN": self.sigspec(cell.en), + "ARGS": self.sigspec(_nir.Value(args)), + } + params = { + "FORMAT": "".join(format), + "ARGS_WIDTH": len(args), + "PRIORITY": -cell_idx, + } + if isinstance(cell, (_nir.AsyncPrint, _nir.AsyncProperty)): + ports["TRG"] = self.sigspec(_nir.Value()) + params["TRG_ENABLE"] = False + params["TRG_WIDTH"] = 0 + params["TRG_POLARITY"] = 0 + if isinstance(cell, (_nir.SyncPrint, _nir.SyncProperty)): + ports["TRG"] = self.sigspec(cell.clk) + params["TRG_ENABLE"] = True + params["TRG_WIDTH"] = 1 + params["TRG_POLARITY"] = cell.clk_edge == "pos" + if isinstance(cell, (_nir.AsyncPrint, _nir.SyncPrint)): + self.builder.cell(f"$print", params=params, ports=ports, src=_src(cell.src_loc)) + if isinstance(cell, (_nir.AsyncProperty, _nir.SyncProperty)): + params["FLAVOR"] = cell.kind + ports["A"] = self.sigspec(cell.test) + self.builder.cell(f"$check", params=params, ports=ports, src=_src(cell.src_loc)) def emit_any_value(self, cell_idx, cell): self.builder.cell(f"${cell.kind}", ports={ @@ -939,8 +980,8 @@ def emit_cells(self): self.emit_write_port(cell_idx, cell) elif isinstance(cell, (_nir.AsyncReadPort, _nir.SyncReadPort)): self.emit_read_port(cell_idx, cell) - elif isinstance(cell, (_nir.AsyncProperty, _nir.SyncProperty)): - self.emit_property(cell_idx, cell) + elif isinstance(cell, (_nir.AsyncPrint, _nir.SyncPrint, _nir.AsyncProperty, _nir.SyncProperty)): + self.emit_print(cell_idx, cell) elif isinstance(cell, _nir.AnyValue): self.emit_any_value(cell_idx, cell) elif isinstance(cell, _nir.Initial): diff --git a/amaranth/hdl/__init__.py b/amaranth/hdl/__init__.py index 769694946..3c82528a0 100644 --- a/amaranth/hdl/__init__.py +++ b/amaranth/hdl/__init__.py @@ -1,6 +1,7 @@ from ._ast import Shape, unsigned, signed, ShapeCastable, ShapeLike from ._ast import Value, ValueCastable, ValueLike from ._ast import Const, C, Mux, Cat, Array, Signal, ClockSignal, ResetSignal +from ._ast import Format, Print, Assert, Assume, Cover from ._dsl import SyntaxError, SyntaxWarning, Module from ._cd import DomainError, ClockDomain from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance @@ -14,6 +15,7 @@ "Shape", "unsigned", "signed", "ShapeCastable", "ShapeLike", "Value", "ValueCastable", "ValueLike", "Const", "C", "Mux", "Cat", "Array", "Signal", "ClockSignal", "ResetSignal", + "Format", "Print", "Assert", "Assume", "Cover", # _dsl "SyntaxError", "SyntaxWarning", "Module", # _cd diff --git a/amaranth/hdl/_ast.py b/amaranth/hdl/_ast.py index cf08e50df..aa2d46b0f 100644 --- a/amaranth/hdl/_ast.py +++ b/amaranth/hdl/_ast.py @@ -2,12 +2,13 @@ import warnings import functools import operator +import string +import re from collections import OrderedDict from collections.abc import Iterable, MutableMapping, MutableSet, MutableSequence from enum import Enum, EnumMeta from itertools import chain -from ._repr import * from .. import tracer from ..utils import * from .._utils import * @@ -21,8 +22,9 @@ "Signal", "ClockSignal", "ResetSignal", "ValueCastable", "ValueLike", "Initial", + "Format", "Statement", "Switch", - "Property", "Assign", "Assert", "Assume", "Cover", + "Property", "Assign", "Print", "Assert", "Assume", "Cover", "SignalKey", "SignalDict", "SignalSet", ] @@ -337,7 +339,7 @@ def __call__(self, *args, **kwargs): # TODO: write an RFC for turning this into a proper interface method def _value_repr(self, value): - return (Repr(FormatInt(), value),) + return (_repr.Repr(_repr.FormatInt(), value),) class _ShapeLikeMeta(type): @@ -1260,6 +1262,17 @@ def eq(self, value, *, src_loc_at=0): #: assert info == "a signal" __hash__ = None # type: ignore + def __format__(self, format_desc): + """Forbidden formatting. + + Since normal Python formatting (f-strings and ``str.format``) must immediately return + a string, it is unsuitable for formatting Amaranth values. To format a value at simulation + time, use :class:`Format` instead. If you really want to dump the AST at elaboration time, + use ``repr`` instead (for instance, via ``f"{value!r}"``). + """ + raise TypeError(f"Value {self!r} cannot be converted to string. Use `Format` for " + f"simulation-time formatting, or use `repr` to print the AST.") + def _lhs_signals(self): raise TypeError(f"Value {self!r} cannot be used in assignments") @@ -1925,20 +1938,20 @@ def __init__(self, shape=None, *, name=None, init=None, reset=None, reset_less=F self._value_repr = tuple(orig_shape._value_repr(self)) elif isinstance(orig_shape, type) and issubclass(orig_shape, Enum): # A non-Amaranth enum needs a value repr constructed for it. - self._value_repr = (Repr(FormatEnum(orig_shape), self),) + self._value_repr = (_repr.Repr(_repr.FormatEnum(orig_shape), self),) else: # Any other case is formatted as a plain integer. - self._value_repr = (Repr(FormatInt(), self),) + self._value_repr = (_repr.Repr(_repr.FormatInt(), self),) # Compute the value representation that will be used by Amaranth. if decoder is None: - self._value_repr = (Repr(FormatInt(), self),) + self._value_repr = (_repr.Repr(_repr.FormatInt(), self),) self._decoder = None elif not (isinstance(decoder, type) and issubclass(decoder, Enum)): - self._value_repr = (Repr(FormatCustom(decoder), self),) + self._value_repr = (_repr.Repr(_repr.FormatCustom(decoder), self),) self._decoder = decoder else: # Violence. In the name of backwards compatibility! - self._value_repr = (Repr(FormatEnum(decoder), self),) + self._value_repr = (_repr.Repr(_repr.FormatEnum(decoder), self),) def enum_decoder(value): try: return "{0.name:}/{0.value:}".format(decoder(value)) @@ -2299,6 +2312,189 @@ def __repr__(self): return "(initial)" +@final +class Format: + def __init__(self, format, *args, **kwargs): + fmt = string.Formatter() + chunks = [] + used_args = set() + auto_arg_index = 0 + + def get_field(field_name): + nonlocal auto_arg_index + if field_name == "": + if auto_arg_index is None: + raise ValueError("cannot switch from manual field " + "specification to automatic field " + "numbering") + field_name = str(auto_arg_index) + auto_arg_index += 1 + elif field_name.isdigit(): + if auto_arg_index is not None and auto_arg_index > 0: + raise ValueError("cannot switch from automatic field " + "numbering to manual field " + "specification") + auto_arg_index = None + + obj, arg_used = fmt.get_field(field_name, args, kwargs) + used_args.add(arg_used) + return obj + + def subformat(sub_string): + result = [] + for literal, field_name, format_spec, conversion in fmt.parse(sub_string): + result.append(literal) + if field_name is not None: + obj = get_field(field_name) + obj = fmt.convert_field(obj, conversion) + format_spec = subformat(format_spec) + result.append(fmt.format_field(obj, format_spec)) + return "".join(result) + + for literal, field_name, format_spec, conversion in fmt.parse(format): + chunks.append(literal) + if field_name is not None: + obj = get_field(field_name) + obj = fmt.convert_field(obj, conversion) + format_spec = subformat(format_spec) + if isinstance(obj, Value): + # Perform validation. + self._parse_format_spec(format_spec, obj.shape()) + chunks.append((obj, format_spec)) + elif isinstance(obj, ValueCastable): + raise TypeError("'ValueCastable' formatting is not supported") + elif isinstance(obj, Format): + if format_spec != "": + raise ValueError(f"Format specifiers ({format_spec!r}) cannot be used for 'Format' objects") + chunks += obj._chunks + else: + chunks.append(fmt.format_field(obj, format_spec)) + + for i in range(len(args)): + if i not in used_args: + raise ValueError(f"format positional argument {i} was not used") + for name in kwargs: + if name not in used_args: + raise ValueError(f"format keyword argument {name!r} was not used") + + self._chunks = self._clean_chunks(chunks) + + @classmethod + def _from_chunks(cls, chunks): + res = object.__new__(cls) + res._chunks = cls._clean_chunks(chunks) + return res + + @classmethod + def _clean_chunks(cls, chunks): + res = [] + for chunk in chunks: + if isinstance(chunk, str) and chunk == "": + continue + if isinstance(chunk, str) and res and isinstance(res[-1], str): + res[-1] += chunk + else: + res.append(chunk) + return tuple(res) + + def _to_format_string(self): + format_string = [] + args = [] + for chunk in self._chunks: + if isinstance(chunk, str): + format_string.append(chunk.replace("{", "{{").replace("}", "}}")) + else: + arg, format_spec = chunk + args.append(arg) + if format_spec: + format_string.append(f"{{:{format_spec}}}") + else: + format_string.append("{}") + return ("".join(format_string), tuple(args)) + + def __add__(self, other): + if not isinstance(other, Format): + return NotImplemented + return Format._from_chunks(self._chunks + other._chunks) + + def __repr__(self): + format_string, args = self._to_format_string() + args = "".join(f" {arg!r}" for arg in args) + return f"(format {format_string!r}{args})" + + def __format__(self, format_desc): + """Forbidden formatting. + + ``Format`` objects cannot be directly formatted for the same reason as the ``Value``s + they contain. + """ + raise TypeError(f"Format object {self!r} cannot be converted to string. Use `repr` " + f"to print the AST, or pass it to the `Print` statement.") + + _FORMAT_SPEC_PATTERN = re.compile(r""" + (?: + (?P.)? + (?P[<>=^]) + )? + (?P[-+ ])? + (?P[#]?[0]?) + (?P[1-9][0-9]*)? + (?P[_,])? + (?P[bodxXcsn])? + """, re.VERBOSE) + + @staticmethod + def _parse_format_spec(spec: str, shape: Shape): + match = Format._FORMAT_SPEC_PATTERN.fullmatch(spec) + if not match: + raise ValueError(f"Invalid format specifier {spec!r}") + if match["align"] == "^": + raise ValueError(f"Alignment {match['align']!r} is not supported") + if match["grouping"] == ",": + raise ValueError(f"Grouping option {match['grouping']!r} is not supported") + if match["type"] == "n": + raise ValueError(f"Presentation type {match['type']!r} is not supported") + if match["type"] in ("c", "s"): + if shape.signed: + raise ValueError(f"Cannot print signed value with format specifier {match['type']!r}") + if match["align"] == "=": + raise ValueError(f"Alignment {match['align']!r} is not allowed with format specifier {match['type']!r}") + if "#" in match["options"]: + raise ValueError(f"Alternate form is not allowed with format specifier {match['type']!r}") + if "0" in match["options"]: + raise ValueError(f"Zero fill is not allowed with format specifier {match['type']!r}") + if match["sign"] is not None: + raise ValueError(f"Sign is not allowed with format specifier {match['type']!r}") + if match["grouping"] is not None: + raise ValueError(f"Cannot specify {match['grouping']!r} with format specifier {match['type']!r}") + if match["type"] == "s" and shape.width % 8 != 0: + raise ValueError(f"Value width must be divisible by 8 with format specifier {match['type']!r}") + return { + # Single character or None. + "fill": match["fill"], + # '<', '>', '=', or None. Cannot be '=' for types 'c' and 's'. + "align": match["align"], + # '-', '+', ' ', or None. Always None for types 'c' and 's'. + "sign": match["sign"], + # "", "#", "0", or "#0". Always "" for types 'c' and 's'. + "options": match["options"], + # An int. + "width": int(match["width"]) if match["width"] is not None else 0, + # '_' or None. Always None for types 'c' and 's'. + "grouping": match["grouping"], + # 'b', 'o', 'd', 'x', 'X', 'c', 's', or None. + "type": match["type"], + } + + def _rhs_signals(self): + res = SignalSet() + for chunk in self._chunks: + if not isinstance(chunk, str): + obj, format_spec = chunk + res |= obj._rhs_signals() + return res + + class _StatementList(list): def __repr__(self): return "({})".format(" ".join(map(repr, self))) @@ -2350,6 +2546,47 @@ def __repr__(self): return f"(eq {self.lhs!r} {self.rhs!r})" +class UnusedPrint(UnusedMustUse): + pass + + +@final +class Print(Statement, MustUse): + _MustUse__warning = UnusedPrint + + def __init__(self, *args, sep=" ", end="\n", src_loc_at=0): + self._MustUse__silence = True + super().__init__(src_loc_at=src_loc_at) + if not isinstance(sep, str): + raise TypeError(f"'sep' must be a string, not {sep!r}") + if not isinstance(end, str): + raise TypeError(f"'end' must be a string, not {end!r}") + chunks = [] + first = True + for arg in args: + if not first and sep != "": + chunks.append(sep) + first = False + chunks += Format("{}", arg)._chunks + if end != "": + chunks.append(end) + self._message = Format._from_chunks(chunks) + del self._MustUse__silence + + @property + def message(self): + return self._message + + def _lhs_signals(self): + return set() + + def _rhs_signals(self): + return self.message._rhs_signals() + + def __repr__(self): + return f"(print {self.message!r})" + + class UnusedProperty(UnusedMustUse): pass @@ -2363,14 +2600,17 @@ class Kind(Enum): Assume = "assume" Cover = "cover" - def __init__(self, kind, test, *, name=None, src_loc_at=0): + def __init__(self, kind, test, message=None, *, src_loc_at=0): + self._MustUse__silence = True super().__init__(src_loc_at=src_loc_at) self._kind = self.Kind(kind) self._test = Value.cast(test) - self._name = name - if not isinstance(self.name, str) and self.name is not None: - raise TypeError("Property name must be a string or None, not {!r}" - .format(self.name)) + if isinstance(message, str): + message = Format._from_chunks([message]) + if message is not None and not isinstance(message, Format): + raise TypeError(f"Property message must be None, str, or Format, not {message!r}") + self._message = message + del self._MustUse__silence @property def kind(self): @@ -2381,31 +2621,33 @@ def test(self): return self._test @property - def name(self): - return self._name + def message(self): + return self._message def _lhs_signals(self): return set() def _rhs_signals(self): + if self.message is not None: + return self.message._rhs_signals() | self.test._rhs_signals() return self.test._rhs_signals() def __repr__(self): - if self.name is not None: - return f"({self.name}: {self.kind.value} {self.test!r})" + if self.message is not None: + return f"({self.kind.value} {self.test!r} {self.message!r})" return f"({self.kind.value} {self.test!r})" -def Assert(test, *, name=None, src_loc_at=0): - return Property("assert", test, name=name, src_loc_at=src_loc_at+1) +def Assert(test, message=None, *, src_loc_at=0): + return Property("assert", test, message, src_loc_at=src_loc_at+1) -def Assume(test, *, name=None, src_loc_at=0): - return Property("assume", test, name=name, src_loc_at=src_loc_at+1) +def Assume(test, message=None, *, src_loc_at=0): + return Property("assume", test, message, src_loc_at=src_loc_at+1) -def Cover(test, *, name=None, src_loc_at=0): - return Property("cover", test, name=name, src_loc_at=src_loc_at+1) +def Cover(test, message=None, *, src_loc_at=0): + return Property("cover", test, message, src_loc_at=src_loc_at+1) class _LateBoundStatement(Statement): @@ -2617,4 +2859,4 @@ class SignalSet(_MappedKeySet): _unmap_key = lambda self, key: key.signal -from ._repr import * +from . import _repr diff --git a/amaranth/hdl/_dsl.py b/amaranth/hdl/_dsl.py index b753ae15a..a74fd7cc3 100644 --- a/amaranth/hdl/_dsl.py +++ b/amaranth/hdl/_dsl.py @@ -9,7 +9,7 @@ from ..utils import bits_for from .. import tracer from ._ast import * -from ._ast import _StatementList, _LateBoundStatement, Property +from ._ast import _StatementList, _LateBoundStatement, Property, Print from ._ir import * from ._cd import * from ._xfrm import * @@ -184,7 +184,7 @@ def resolve_statement(stmt): src_loc=stmt.src_loc, case_src_locs=stmt.case_src_locs, ) - elif isinstance(stmt, (Assign, Property)): + elif isinstance(stmt, (Assign, Property, Print)): return stmt else: assert False # :nocov: @@ -584,9 +584,9 @@ def _add_statement(self, assigns, domain, depth): self._pop_ctrl() for stmt in Statement.cast(assigns): - if not isinstance(stmt, (Assign, Property, _LateBoundStatement)): + if not isinstance(stmt, (Assign, Property, Print, _LateBoundStatement)): raise SyntaxError( - f"Only assignments and property checks may be appended to d.{domain}") + f"Only assignments, prints, and property checks may be appended to d.{domain}") stmt._MustUse__used = True diff --git a/amaranth/hdl/_ir.py b/amaranth/hdl/_ir.py index 6f2da5262..1f5e0be67 100644 --- a/amaranth/hdl/_ir.py +++ b/amaranth/hdl/_ir.py @@ -222,7 +222,7 @@ def flatten_subfrags_if_needed(subfrags): continue # While we're at it, show a message. - message = ("Signal '{}' is driven from multiple fragments: {}" + message = ("Signal '{!r}' is driven from multiple fragments: {}" .format(signal, ", ".join(subfrag_names))) if mode == "error": raise DriverConflict(message) @@ -972,6 +972,17 @@ def emit_assign(self, module_idx: int, cd: "_cd.ClockDomain | None", lhs: _ast.V else: assert False # :nocov: + def emit_format(self, module_idx, format): + chunks = [] + for chunk in format._chunks: + if isinstance(chunk, str): + chunks.append(chunk) + else: + value, format_desc = chunk + value, signed = self.emit_rhs(module_idx, value) + chunks.append(_nir.FormatValue(value, format_desc, signed=signed)) + return _nir.Format(chunks) + def emit_stmt(self, module_idx: int, fragment: _ir.Fragment, domain: str, stmt: _ast.Statement, cond: _nir.Net): if domain == "comb": @@ -986,6 +997,25 @@ def emit_stmt(self, module_idx: int, fragment: _ir.Fragment, domain: str, if len(rhs) < width: rhs = self.extend(rhs, signed, width) self.emit_assign(module_idx, cd, stmt.lhs, 0, rhs, cond, src_loc=stmt.src_loc) + elif isinstance(stmt, _ast.Print): + en_cell = _nir.AssignmentList(module_idx, + default=_nir.Value.zeros(), + assignments=[ + _nir.Assignment(cond=cond, start=0, value=_nir.Value.ones(), + src_loc=stmt.src_loc) + ], + src_loc=stmt.src_loc) + cond, = self.netlist.add_value_cell(1, en_cell) + format = self.emit_format(module_idx, stmt.message) + if cd is None: + cell = _nir.AsyncPrint(module_idx, en=cond, + format=format, src_loc=stmt.src_loc) + else: + clk, = self.emit_signal(cd.clk) + cell = _nir.SyncPrint(module_idx, en=cond, + clk=clk, clk_edge=cd.clk_edge, + format=format, src_loc=stmt.src_loc) + self.netlist.add_cell(cell) elif isinstance(stmt, _ast.Property): test, _signed = self.emit_rhs(module_idx, stmt.test) if len(test) != 1: @@ -999,14 +1029,18 @@ def emit_stmt(self, module_idx: int, fragment: _ir.Fragment, domain: str, ], src_loc=stmt.src_loc) cond, = self.netlist.add_value_cell(1, en_cell) + if stmt.message is None: + format = None + else: + format = self.emit_format(module_idx, stmt.message) if cd is None: cell = _nir.AsyncProperty(module_idx, kind=stmt.kind.value, test=test, en=cond, - name=stmt.name, src_loc=stmt.src_loc) + format=format, src_loc=stmt.src_loc) else: clk, = self.emit_signal(cd.clk) cell = _nir.SyncProperty(module_idx, kind=stmt.kind.value, test=test, en=cond, - clk=clk, clk_edge=cd.clk_edge, name=stmt.name, - src_loc=stmt.src_loc) + clk=clk, clk_edge=cd.clk_edge, + format=format, src_loc=stmt.src_loc) self.netlist.add_cell(cell) elif isinstance(stmt, _ast.Switch): test, _signed = self.emit_rhs(module_idx, stmt.test) diff --git a/amaranth/hdl/_nir.py b/amaranth/hdl/_nir.py index cdc722963..fdf58c54e 100644 --- a/amaranth/hdl/_nir.py +++ b/amaranth/hdl/_nir.py @@ -6,13 +6,16 @@ __all__ = [ # Netlist core - "Net", "Value", "Netlist", "ModuleNetFlow", "Module", "Cell", "Top", + "Net", "Value", "FormatValue", "Format", + "Netlist", "ModuleNetFlow", "Module", "Cell", "Top", # Computation cells "Operator", "Part", # Decision tree cells "Matches", "PriorityMatch", "Assignment", "AssignmentList", # Storage cells "FlipFlop", "Memory", "SyncWritePort", "AsyncReadPort", "SyncReadPort", + # Print cells + "AsyncPrint", "SyncPrint", # Formal verification cells "Initial", "AnyValue", "AsyncProperty", "SyncProperty", # Foreign interface cells @@ -159,6 +162,57 @@ def __repr__(self): __str__ = __repr__ +class FormatValue: + """A single formatted value within ``Format``. + + Attributes + ---------- + + value: Value + format_desc: str + signed: bool + """ + def __init__(self, value, format_desc, *, signed): + assert isinstance(format_desc, str) + assert isinstance(signed, bool) + self.value = Value(value) + self.format_desc = format_desc + self.signed = signed + + def __repr__(self): + sign = "s" if self.signed else "u" + return f"({sign} {self.value!r} {self.format_desc!r})" + + +class Format: + """Like _ast.Format, but for NIR. + + Attributes + ---------- + + chunks: tuple of str and FormatValue + """ + def __init__(self, chunks): + self.chunks = tuple(chunks) + for chunk in self.chunks: + assert isinstance(chunk, (str, FormatValue)) + + def __repr__(self): + return f"({' '.join(repr(chunk) for chunk in self.chunks)})" + + def input_nets(self): + nets = set() + for chunk in self.chunks: + if isinstance(chunk, FormatValue): + nets |= set(chunk.value) + return nets + + def resolve_nets(self, netlist: "Netlist"): + for chunk in self.chunks: + if isinstance(chunk, FormatValue): + chunk.value = netlist.resolve_value(chunk.value) + + class Netlist: """A fine netlist. Consists of: @@ -837,6 +891,73 @@ def __repr__(self): return f"(read_port {self.memory} {self.width} {self.addr} {self.en} {self.clk_edge} {self.clk} ({transparent_for}))" +class AsyncPrint(Cell): + """Corresponds to ``Print`` in the "comb" domain. + + Attributes + ---------- + + en: Net + format: Format + """ + def __init__(self, module_idx, *, en, format, src_loc): + super().__init__(module_idx, src_loc=src_loc) + + assert isinstance(format, Format) + self.en = Net.ensure(en) + self.format = format + + def input_nets(self): + return {self.en} | self.format.input_nets() + + def output_nets(self, self_idx: int): + return set() + + def resolve_nets(self, netlist: Netlist): + self.en = netlist.resolve_net(self.en) + self.format.resolve_nets(netlist) + + def __repr__(self): + return f"(print {self.en} {self.format!r})" + + +class SyncPrint(Cell): + """Corresponds to ``Print`` in domains other than "comb". + + Attributes + ---------- + + en: Net + clk: Net + clk_edge: str, either 'pos' or 'neg' + format: Format + """ + + def __init__(self, module_idx, *, en, clk, clk_edge, format, src_loc): + super().__init__(module_idx, src_loc=src_loc) + + assert clk_edge in ('pos', 'neg') + assert isinstance(format, Format) + self.en = Net.ensure(en) + self.clk = Net.ensure(clk) + self.clk_edge = clk_edge + self.format = format + + def input_nets(self): + return {self.en, self.clk} | self.format.input_nets() + + def output_nets(self, self_idx: int): + return set() + + def resolve_nets(self, netlist: Netlist): + self.en = netlist.resolve_net(self.en) + self.clk = netlist.resolve_net(self.clk) + self.format.resolve_nets(netlist) + + def __repr__(self): + return f"(print {self.en} {self.clk_edge} {self.clk} {self.format!r})" + + class Initial(Cell): """Corresponds to ``Initial`` value.""" @@ -892,19 +1013,23 @@ class AsyncProperty(Cell): kind: str, either 'assert', 'assume', or 'cover' test: Net en: Net - name: str + format: Format or None """ - def __init__(self, module_idx, *, kind, test, en, name, src_loc): + def __init__(self, module_idx, *, kind, test, en, format, src_loc): super().__init__(module_idx, src_loc=src_loc) + assert format is None or isinstance(format, Format) assert kind in ('assert', 'assume', 'cover') self.kind = kind self.test = Net.ensure(test) self.en = Net.ensure(en) - self.name = name + self.format = format def input_nets(self): - return {self.test, self.en} + if self.format is None: + return {self.test, self.en} + else: + return {self.test, self.en} | self.format.input_nets() def output_nets(self, self_idx: int): return set() @@ -912,9 +1037,11 @@ def output_nets(self, self_idx: int): def resolve_nets(self, netlist: Netlist): self.test = netlist.resolve_net(self.test) self.en = netlist.resolve_net(self.en) + if self.format is not None: + self.format.resolve_nets(netlist) def __repr__(self): - return f"({self.kind} {self.name!r} {self.test} {self.en})" + return f"({self.kind} {self.test} {self.en} {self.format!r})" class SyncProperty(Cell): @@ -928,12 +1055,13 @@ class SyncProperty(Cell): en: Net clk: Net clk_edge: str, either 'pos' or 'neg' - name: str + format: Format or None """ - def __init__(self, module_idx, *, kind, test, en, clk, clk_edge, name, src_loc): + def __init__(self, module_idx, *, kind, test, en, clk, clk_edge, format, src_loc): super().__init__(module_idx, src_loc=src_loc) + assert format is None or isinstance(format, Format) assert kind in ('assert', 'assume', 'cover') assert clk_edge in ('pos', 'neg') self.kind = kind @@ -941,10 +1069,13 @@ def __init__(self, module_idx, *, kind, test, en, clk, clk_edge, name, src_loc): self.en = Net.ensure(en) self.clk = Net.ensure(clk) self.clk_edge = clk_edge - self.name = name + self.format = format def input_nets(self): - return {self.test, self.en, self.clk} + if self.format is None: + return {self.test, self.en, self.clk} + else: + return {self.test, self.en, self.clk} | self.format.input_nets() def output_nets(self, self_idx: int): return set() @@ -953,9 +1084,11 @@ def resolve_nets(self, netlist: Netlist): self.test = netlist.resolve_net(self.test) self.en = netlist.resolve_net(self.en) self.clk = netlist.resolve_net(self.clk) + if self.format is not None: + self.format.resolve_nets(netlist) def __repr__(self): - return f"({self.kind} {self.name!r} {self.test} {self.en} {self.clk_edge} {self.clk})" + return f"({self.kind} {self.test} {self.en} {self.clk_edge} {self.clk} {self.format!r})" class Instance(Cell): diff --git a/amaranth/hdl/_xfrm.py b/amaranth/hdl/_xfrm.py index 02ee2cdac..95505342f 100644 --- a/amaranth/hdl/_xfrm.py +++ b/amaranth/hdl/_xfrm.py @@ -5,7 +5,7 @@ from .._utils import flatten from .. import tracer from ._ast import * -from ._ast import _StatementList, AnyValue, Property +from ._ast import _StatementList, AnyValue from ._cd import * from ._ir import * from ._mem import MemoryInstance @@ -145,6 +145,10 @@ class StatementVisitor(metaclass=ABCMeta): def on_Assign(self, stmt): pass # :nocov: + @abstractmethod + def on_Print(self, stmt): + pass # :nocov: + @abstractmethod def on_Property(self, stmt): pass # :nocov: @@ -166,6 +170,8 @@ def replace_statement_src_loc(self, stmt, new_stmt): def on_statement(self, stmt): if type(stmt) is Assign: new_stmt = self.on_Assign(stmt) + elif type(stmt) is Print: + new_stmt = self.on_Print(stmt) elif type(stmt) is Property: new_stmt = self.on_Property(stmt) elif type(stmt) is Switch: @@ -178,7 +184,7 @@ def on_statement(self, stmt): new_stmt.src_loc = stmt.src_loc if isinstance(new_stmt, Switch) and isinstance(stmt, Switch): new_stmt.case_src_locs = stmt.case_src_locs - if isinstance(new_stmt, Property): + if isinstance(new_stmt, (Print, Property)): new_stmt._MustUse__used = True return new_stmt @@ -190,11 +196,28 @@ class StatementTransformer(StatementVisitor): def on_value(self, value): return value + def on_Format(self, format): + chunks = [] + for chunk in format._chunks: + if isinstance(chunk, str): + chunks.append(chunk) + else: + value, format_spec = chunk + chunks.append((self.on_value(value), format_spec)) + return Format._from_chunks(chunks) + def on_Assign(self, stmt): return Assign(self.on_value(stmt.lhs), self.on_value(stmt.rhs)) + def on_Print(self, stmt): + return Print(self.on_Format(stmt.message), end="") + def on_Property(self, stmt): - return Property(stmt.kind, self.on_value(stmt.test), name=stmt.name) + if stmt.message is None: + message = None + else: + message = self.on_Format(stmt.message) + return Property(stmt.kind, self.on_value(stmt.test), message) def on_Switch(self, stmt): cases = OrderedDict((k, self.on_statement(s)) for k, s in stmt.cases.items()) @@ -386,12 +409,23 @@ def on_ArrayProxy(self, value): def on_Initial(self, value): pass + def on_Format(self, format): + for chunk in format._chunks: + if not isinstance(chunk, str): + value, _format_spec = chunk + self.on_value(value) + def on_Assign(self, stmt): self.on_value(stmt.lhs) self.on_value(stmt.rhs) + def on_Print(self, stmt): + self.on_Format(stmt.message) + def on_Property(self, stmt): self.on_value(stmt.test) + if stmt.message is not None: + self.on_Format(stmt.message) def on_Switch(self, stmt): self.on_value(stmt.test) diff --git a/amaranth/lib/fifo.py b/amaranth/lib/fifo.py index e376ea181..058ad5ba0 100644 --- a/amaranth/lib/fifo.py +++ b/amaranth/lib/fifo.py @@ -1,7 +1,8 @@ """First-in first-out queues.""" from .. import * -from ..asserts import * +from ..hdl import Assume +from ..asserts import Initial from ..utils import ceil_log2 from .coding import GrayEncoder, GrayDecoder from .cdc import FFSynchronizer, AsyncFFSynchronizer diff --git a/amaranth/sim/_pyrtl.py b/amaranth/sim/_pyrtl.py index 93ac452d3..395e8730c 100644 --- a/amaranth/sim/_pyrtl.py +++ b/amaranth/sim/_pyrtl.py @@ -4,7 +4,7 @@ import sys from ..hdl import * -from ..hdl._ast import SignalSet, _StatementList +from ..hdl._ast import SignalSet, _StatementList, Property from ..hdl._xfrm import ValueVisitor, StatementVisitor from ..hdl._mem import MemoryInstance from ._base import BaseProcess @@ -113,6 +113,15 @@ def __init__(self, state, emitter, *, mode, inputs=None): # If not None, `inputs` gets populated with RHS signals. self.inputs = inputs + def sign(self, value): + value_mask = (1 << len(value)) - 1 + masked = f"({value_mask:#x} & {self(value)})" + + if value.shape().signed: + return f"sign({masked}, {-1 << (len(value) - 1):#x})" + else: # unsigned + return masked + def on_Const(self, value): return f"{value.value}" @@ -345,7 +354,31 @@ def gen(arg): return gen +def value_to_string(value): + """Unpack a Verilog-like (but LSB-first) string of unknown width from an integer.""" + msg = bytearray() + while value: + byte = value & 0xff + value >>= 8 + if byte: + msg.append(byte) + return msg.decode() + + +def pin_blame(src_loc, exc): + if src_loc is None: + raise exc + filename, line = src_loc + code = compile("\n" * (line - 1) + "raise exc", filename, "exec") + exec(code, {"exc": exc}) + + class _StatementCompiler(StatementVisitor, _Compiler): + helpers = { + "value_to_string": value_to_string, + "pin_blame": pin_blame, + } + def __init__(self, state, emitter, *, inputs=None, outputs=None): super().__init__(state, emitter) self.rhs = _RHSValueCompiler(state, emitter, mode="curr", inputs=inputs) @@ -358,11 +391,7 @@ def on_statements(self, stmts): self.emitter.append("pass") def on_Assign(self, stmt): - gen_rhs_value = self.rhs(stmt.rhs) # check for oversized value before generating mask - gen_rhs = f"({(1 << len(stmt.rhs)) - 1:#x} & {gen_rhs_value})" - if stmt.rhs.shape().signed: - gen_rhs = f"sign({gen_rhs}, {-1 << (len(stmt.rhs) - 1):#x})" - return self.lhs(stmt.lhs)(gen_rhs) + return self.lhs(stmt.lhs)(self.rhs.sign(stmt.rhs)) def on_Switch(self, stmt): gen_test_value = self.rhs(stmt.test) # check for oversized value before generating mask @@ -387,8 +416,47 @@ def on_Switch(self, stmt): with self.emitter.indent(): self(stmts) + def emit_format(self, format): + format_string = [] + args = [] + for chunk in format._chunks: + if isinstance(chunk, str): + format_string.append(chunk.replace("{", "{{").replace("}", "}}")) + else: + value, format_desc = chunk + value = self.rhs.sign(value) + if format_desc.endswith("s"): + format_desc = format_desc[:-1] + value = f"value_to_string({value})" + format_string.append(f"{{:{format_desc}}}") + args.append(value) + format_string = "".join(format_string) + args = ", ".join(args) + return f"{format_string!r}.format({args})" + + def on_Print(self, stmt): + self.emitter.append(f"print({self.emit_format(stmt.message)}, end='')") + def on_Property(self, stmt): - raise NotImplementedError # :nocov: + if stmt.kind == Property.Kind.Cover: + if stmt.message is not None: + self.emitter.append(f"if {self.rhs.sign(stmt.test)}:") + with self.emitter.indent(): + filename, line = stmt.src_loc + self.emitter.append(f"print(\"Coverage hit at \" {filename!r} \":{line}:\", {self.emit_format(stmt.message)})") + else: + self.emitter.append(f"if not {self.rhs.sign(stmt.test)}:") + with self.emitter.indent(): + if stmt.kind == Property.Kind.Assert: + kind = "Assertion" + elif stmt.kind == Property.Kind.Assume: + kind = "Assumption" + else: + assert False # :nocov: + if stmt.message is not None: + self.emitter.append(f"pin_blame({stmt.src_loc!r}, AssertionError(\"{kind} violated: \" + {self.emit_format(stmt.message)}))") + else: + self.emitter.append(f"pin_blame({stmt.src_loc!r}, AssertionError(\"{kind} violated\"))") @classmethod def compile(cls, state, stmt): @@ -541,7 +609,11 @@ def __call__(self, fragment): else: filename = "" - exec_locals = {"slots": self.state.slots, **_ValueCompiler.helpers} + exec_locals = { + "slots": self.state.slots, + **_ValueCompiler.helpers, + **_StatementCompiler.helpers, + } exec(compile(code, filename, "exec"), exec_locals) domain_process.run = exec_locals["run"] diff --git a/docs/changes.rst b/docs/changes.rst index c398cb561..220c59867 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -33,6 +33,8 @@ Apply the following changes to code written against Amaranth 0.4 to migrate it t * Convert uses of ``Simulator.add_sync_process`` used as testbenches to ``Simulator.add_testbench`` * Convert other uses of ``Simulator.add_sync_process`` to ``Simulator.add_process`` * Replace uses of ``amaranth.hdl.Memory`` with ``amaranth.lib.memory.Memory`` +* Replace imports of ``amaranth.asserts.{Assert, Assume, Cover}`` with imports from ``amaranth.hdl`` +* Remove any usage of ``name=`` with assertions, possibly replacing them with custom messages Implemented RFCs @@ -44,6 +46,7 @@ Implemented RFCs .. _RFC 43: https://amaranth-lang.org/rfcs/0043-rename-reset-to-init.html .. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html .. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html +.. _RFC 50: https://amaranth-lang.org/rfcs/0050-print.html * `RFC 17`_: Remove ``log2_int`` * `RFC 27`_: Testbench processes for the simulator @@ -51,6 +54,7 @@ Implemented RFCs * `RFC 43`_: Rename ``reset=`` to ``init=`` * `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory`` * `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)`` +* `RFC 50`_: ``Print`` statement and string formatting Language changes @@ -60,6 +64,7 @@ Language changes * Added: :class:`ast.Slice` objects have been made const-castable. * Added: :func:`amaranth.utils.ceil_log2`, :func:`amaranth.utils.exact_log2`. (`RFC 17`_) +* Added: :class:`Format` objects, :class:`Print` statements, messages in :class:`Assert`, :class:`Assume` and :class:`Cover`. (`RFC 50`_) * Changed: ``m.Case()`` with no patterns is never active instead of always active. (`RFC 39`_) * Changed: ``Value.matches()`` with no patterns is ``Const(0)`` instead of ``Const(1)``. (`RFC 39`_) * Changed: ``Signal(range(stop), init=stop)`` warning has been changed into a hard error and made to trigger on any out-of range value. @@ -67,11 +72,13 @@ Language changes * Changed: ``Shape.cast(range(1))`` is now ``unsigned(0)``. (`RFC 46`_) * Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_) * Changed: :class:`Shape` has been made immutable and hashable. +* Changed: :class:`Assert`, :class:`Assume`, :class:`Cover` have been moved to :mod:`amaranth.hdl` from :mod:`amaranth.asserts`. (`RFC 50`_) * Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_) * Deprecated: :class:`amaranth.hdl.Memory`. (`RFC 45`_) * Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_) * Removed: (deprecated in 0.4) :class:`Repl`. (`RFC 10`_) * Removed: (deprecated in 0.4) :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`. +* Removed: assertion names in :class:`Assert`, :class:`Assume` and :class:`Cover`. (`RFC 50`_) Standard library changes @@ -89,6 +96,7 @@ Toolchain changes ----------------- * Added: ``Simulator.add_testbench``. (`RFC 27`_) +* Added: support for :class:`amaranth.hdl.Assert` in simulation. (`RFC 50`_) * Deprecated: ``Settle`` simulation command. (`RFC 27`_) * Deprecated: ``Simulator.add_sync_process``. (`RFC 27`_) * Removed: (deprecated in 0.4) use of mixed-case toolchain environment variable names, such as ``NMIGEN_ENV_Diamond`` or ``AMARANTH_ENV_Diamond``; use upper-case environment variable names, such as ``AMARANTH_ENV_DIAMOND``. diff --git a/docs/guide.rst b/docs/guide.rst index adb4c1495..d42972bf6 100644 --- a/docs/guide.rst +++ b/docs/guide.rst @@ -938,6 +938,8 @@ Every signal included in the target of an assignment becomes a part of the domai The answer is no. While this kind of code is occasionally useful, rejecting it greatly simplifies backends, simulators, and analyzers. +In addition to assignments, :ref:`assertions, assumptions `, and :ref:`debug prints ` can be added using the same syntax. + .. _lang-assignorder: @@ -1287,6 +1289,89 @@ Consider the following code: Whenever there is a transition on the clock of the ``sync`` domain, the :py:`timer` signal is incremented by one if :py:`up` is true, decremented by one if :py:`down` is true, and retains its value otherwise. +.. _lang-assert: + +Assertions +========== + +Some properties are so important that if they are violated, the computations described by the design become meaningless. These properties should be guarded with an :class:`Assert` statement that immediately terminates the simulation if its condition is false. Assertions should generally be added to a :ref:`synchronous domain `, and may have an optional message printed when it is violated: + +.. testcode:: + + ip = Signal(16) + m.d.sync += Assert(ip < 128, "instruction pointer past the end of program code!") + +Assertions may be nested within a :ref:`control block `: + +.. testcode:: + :hide: + + booting = Signal() + +.. testcode:: + + with m.If(~booting): + m.d.sync += Assert(ip < 128) + +.. warning:: + + While is is also possible to add assertions to the :ref:`combinatorial domain `, simulations of combinatorial circuits may have *glitches*: instantaneous, transient changes in the values of expressions that are being computed which do not affect the result of the computation (and are not visible in most waveform viewers for that reason). Depending on the tools used for simulation, a glitch in the condition of an assertion or of a :ref:`control block ` that contains it may cause the simulation to be terminated, even if the glitch would have been instantaneously resolved afterwards. + + If the condition of an assertion is assigned in a synchronous domain, then it is safe to add that assertion in the combinatorial domain. For example, neither of the assertions in the example below will be violated due to glitches, regardless of which domain the :py:`ip` and :py:`booting` signals are driven by: + + .. testcode:: + + ip_sync = Signal.like(ip) + m.d.sync += ip_sync.eq(ip) + + m.d.comb += Assert(ip_sync < 128) + with m.If(booting): + m.d.comb += Assert(ip_sync < 128) + + Assertions should be added in a :ref:`synchronous domain ` when possible. In cases where it is not, such as if the condition is a signal that is assigned in a synchronous domain elsewhere, care should be taken while adding the assertion to the combinatorial domain. + + +.. _lang-print: + +Debug printing +============== + +The value of any expression, or of several of them, can be printed to the terminal during simulation using the :class:`Print` statement. When added to the :ref:`combinatorial domain `, the value of an expression is printed whenever it changes: + +.. testcode:: + + state = Signal() + m.d.comb += Print(state) + +When added to a :ref:`synchronous domain `, the value of an expression is printed whenever the active edge occurs on the clock of that domain: + +.. testcode:: + + m.d.sync += Print("on tick: ", state) + +The :class:`Print` statement, regardless of the domain, may be nested within a :ref:`control block `: + +.. testcode:: + + old_state = Signal.like(state) + m.d.sync += old_state.eq(state) + with m.If(state != old_state): + m.d.sync += Print("was: ", old_state, "now: ", state) + +The arguments to the :class:`Print` statement have the same meaning as the arguments to the Python :func:`print` function, with the exception that only :py:`sep` and :py:`end` keyword arguments are supported. In addition, the :class:`Format` helper can be used to apply formatting to the values, similar to the Python :meth:`str.format` method: + +.. testcode:: + + addr = Signal(32) + m.d.sync += Print(Format("address: {:08x}", addr)) + +In both :class:`Print` and :class:`Format`, arguments that are not Amaranth :ref:`values ` are formatted using the usual Python rules. The optional second :py:`message` argument to :class:`Assert` (described :ref:`above `) also accepts a string or the :class:`Format` helper: + +.. testcode:: + + m.d.sync += Assert((addr & 0b111) == 0, message=Format("unaligned address {:08x}!", addr)) + + .. _lang-clockdomains: Clock domains diff --git a/docs/reference.rst b/docs/reference.rst index ba3ef753b..51399a106 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -63,6 +63,9 @@ The prelude exports exactly the following names: * :class:`Signal` * :class:`ClockSignal` * :class:`ResetSignal` +* :class:`Format` +* :class:`Print` +* :func:`Assert` * :class:`Module` * :class:`ClockDomain` * :class:`Elaboratable` diff --git a/tests/test_hdl_ast.py b/tests/test_hdl_ast.py index 12347e15a..5cc6c2a47 100644 --- a/tests/test_hdl_ast.py +++ b/tests/test_hdl_ast.py @@ -1,3 +1,5 @@ +# amaranth: UnusedPrint=no, UnusedProperty + import warnings from enum import Enum, EnumMeta @@ -329,8 +331,6 @@ def test_getitem_wrong(self): r"^Cannot slice value with a value; use Value.bit_select\(\) or Value.word_select\(\) instead$"): Const(31)[s:s+3] - - def test_shift_left(self): self.assertRepr(Const(256, unsigned(9)).shift_left(0), "(cat (const 0'd0) (const 9'd256))") @@ -452,6 +452,12 @@ def test_replicate_repr(self): s = Const(10).replicate(3) self.assertEqual(repr(s), "(cat (const 4'd10) (const 4'd10) (const 4'd10))") + def test_format_wrong(self): + sig = Signal() + with self.assertRaisesRegex(TypeError, + r"^Value \(sig sig\) cannot be converted to string."): + f"{sig}" + class ConstTestCase(FHDLTestCase): def test_shape(self): @@ -1494,6 +1500,151 @@ def test_initial(self): self.assertEqual(i.shape(), unsigned(1)) +class FormatTestCase(FHDLTestCase): + def test_construct(self): + a = Signal() + b = Signal() + c = Signal() + self.assertRepr(Format("abc"), "(format 'abc')") + fmt = Format("{{abc}}") + self.assertRepr(fmt, "(format '{{abc}}')") + self.assertEqual(fmt._chunks, ("{abc}",)) + fmt = Format("{abc}", abc="{def}") + self.assertRepr(fmt, "(format '{{def}}')") + self.assertEqual(fmt._chunks, ("{def}",)) + fmt = Format("a: {a:0{b}}, b: {b}", a=13, b=4) + self.assertRepr(fmt, "(format 'a: 0013, b: 4')") + fmt = Format("a: {a:0{b}x}, b: {b}", a=a, b=4) + self.assertRepr(fmt, "(format 'a: {:04x}, b: 4' (sig a))") + fmt = Format("a: {a}, b: {b}, a: {a}", a=a, b=b) + self.assertRepr(fmt, "(format 'a: {}, b: {}, a: {}' (sig a) (sig b) (sig a))") + fmt = Format("a: {0}, b: {1}, a: {0}", a, b) + self.assertRepr(fmt, "(format 'a: {}, b: {}, a: {}' (sig a) (sig b) (sig a))") + fmt = Format("a: {}, b: {}", a, b) + self.assertRepr(fmt, "(format 'a: {}, b: {}' (sig a) (sig b))") + subfmt = Format("a: {:2x}, b: {:3x}", a, b) + fmt = Format("sub: {}, c: {:4x}", subfmt, c) + self.assertRepr(fmt, "(format 'sub: a: {:2x}, b: {:3x}, c: {:4x}' (sig a) (sig b) (sig c))") + + def test_construct_wrong(self): + a = Signal() + b = Signal(signed(16)) + with self.assertRaisesRegex(ValueError, + r"^cannot switch from manual field specification to automatic field numbering$"): + Format("{0}, {}", a, b) + with self.assertRaisesRegex(ValueError, + r"^cannot switch from automatic field numbering to manual field specification$"): + Format("{}, {1}", a, b) + with self.assertRaisesRegex(TypeError, + r"^'ValueCastable' formatting is not supported$"): + Format("{}", MockValueCastable(Const(0))) + with self.assertRaisesRegex(ValueError, + r"^Format specifiers \('s'\) cannot be used for 'Format' objects$"): + Format("{:s}", Format("")) + with self.assertRaisesRegex(ValueError, + r"^format positional argument 1 was not used$"): + Format("{}", a, b) + with self.assertRaisesRegex(ValueError, + r"^format keyword argument 'b' was not used$"): + Format("{a}", a=a, b=b) + with self.assertRaisesRegex(ValueError, + r"^Invalid format specifier 'meow'$"): + Format("{a:meow}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Alignment '\^' is not supported$"): + Format("{a:^13}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Grouping option ',' is not supported$"): + Format("{a:,}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Presentation type 'n' is not supported$"): + Format("{a:n}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Cannot print signed value with format specifier 'c'$"): + Format("{b:c}", b=b) + with self.assertRaisesRegex(ValueError, + r"^Value width must be divisible by 8 with format specifier 's'$"): + Format("{a:s}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Alignment '=' is not allowed with format specifier 'c'$"): + Format("{a:=13c}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Sign is not allowed with format specifier 'c'$"): + Format("{a:+13c}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Zero fill is not allowed with format specifier 'c'$"): + Format("{a:013c}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Alternate form is not allowed with format specifier 'c'$"): + Format("{a:#13c}", a=a) + with self.assertRaisesRegex(ValueError, + r"^Cannot specify '_' with format specifier 'c'$"): + Format("{a:_c}", a=a) + + def test_plus(self): + a = Signal() + b = Signal() + fmt_a = Format("a = {};", a) + fmt_b = Format("b = {};", b) + fmt = fmt_a + fmt_b + self.assertRepr(fmt, "(format 'a = {};b = {};' (sig a) (sig b))") + self.assertEqual(fmt._chunks[2], ";b = ") + + def test_plus_wrong(self): + with self.assertRaisesRegex(TypeError, + r"^unsupported operand type\(s\) for \+: 'Format' and 'str'$"): + Format("") + "" + + def test_format_wrong(self): + fmt = Format("") + with self.assertRaisesRegex(TypeError, + r"^Format object .* cannot be converted to string."): + f"{fmt}" + + +class PrintTestCase(FHDLTestCase): + def test_construct(self): + a = Signal() + b = Signal() + p = Print("abc") + self.assertRepr(p, "(print (format 'abc\\n'))") + p = Print("abc", "def") + self.assertRepr(p, "(print (format 'abc def\\n'))") + p = Print("abc", b) + self.assertRepr(p, "(print (format 'abc {}\\n' (sig b)))") + p = Print(a, b, end="", sep=", ") + self.assertRepr(p, "(print (format '{}, {}' (sig a) (sig b)))") + p = Print(Format("a: {a:04x}", a=a)) + self.assertRepr(p, "(print (format 'a: {:04x}\\n' (sig a)))") + + def test_construct_wrong(self): + with self.assertRaisesRegex(TypeError, + r"^'sep' must be a string, not 13$"): + Print("", sep=13) + with self.assertRaisesRegex(TypeError, + r"^'end' must be a string, not 13$"): + Print("", end=13) + + +class AssertTestCase(FHDLTestCase): + def test_construct(self): + a = Signal() + b = Signal() + p = Assert(a) + self.assertRepr(p, "(assert (sig a))") + p = Assert(a, "abc") + self.assertRepr(p, "(assert (sig a) (format 'abc'))") + p = Assert(a, Format("a = {}, b = {}", a, b)) + self.assertRepr(p, "(assert (sig a) (format 'a = {}, b = {}' (sig a) (sig b)))") + + def test_construct_wrong(self): + a = Signal() + b = Signal() + with self.assertRaisesRegex(TypeError, + r"^Property message must be None, str, or Format, not \(sig b\)$"): + Assert(a, b) + + class SwitchTestCase(FHDLTestCase): def test_default_case(self): s = Switch(Const(0), {None: []}) diff --git a/tests/test_hdl_dsl.py b/tests/test_hdl_dsl.py index e4fc85174..c3e8c023c 100644 --- a/tests/test_hdl_dsl.py +++ b/tests/test_hdl_dsl.py @@ -87,7 +87,7 @@ def test_d_wrong(self): def test_d_asgn_wrong(self): m = Module() with self.assertRaisesRegex(SyntaxError, - r"^Only assignments and property checks may be appended to d\.sync$"): + r"^Only assignments, prints, and property checks may be appended to d\.sync$"): m.d.sync += Switch(self.s1, {}) def test_comb_wrong(self): diff --git a/tests/test_hdl_ir.py b/tests/test_hdl_ir.py index 8e9eafe53..66e1401ed 100644 --- a/tests/test_hdl_ir.py +++ b/tests/test_hdl_ir.py @@ -214,12 +214,12 @@ def test_tree(self): (cell 1 3 (~ 2.0)) (cell 2 4 (~ 6.0)) (cell 3 4 (assignment_list 1'd0 (1 0:1 1'd1))) - (cell 4 4 (assert None 0.2 3.0)) + (cell 4 4 (assert 0.2 3.0 None)) (cell 5 5 (~ 6.0)) (cell 6 7 (~ 10.0)) (cell 7 7 (~ 0.2)) (cell 8 7 (assignment_list 1'd0 (1 0:1 1'd1))) - (cell 9 7 (assert None 7.0 8.0)) + (cell 9 7 (assert 7.0 8.0 None)) (cell 10 8 (~ 0.2)) ) """) @@ -3146,6 +3146,69 @@ def test_sync(self): ) """) + def test_print(self): + m = Module() + a = Signal(6) + b = Signal(signed(8)) + en = Signal() + m.domains.a = ClockDomain() + m.domains.b = ClockDomain(async_reset=True) + m.domains.c = ClockDomain(reset_less=True, clk_edge="neg") + with m.If(en): + m.d.comb += Print(a, end="") + m.d.comb += Print(b) + m.d.a += Print(a, b) + m.d.b += Print(Format("values: {:02x}, {:+d}", a, b)) + m.d.c += Print("meow") + nl = build_netlist(Fragment.get(m, None), [ + a, b, en, + ClockSignal("a"), ResetSignal("a"), + ClockSignal("b"), ResetSignal("b"), + ClockSignal("c"), + ]) + self.assertRepr(nl, """ + ( + (module 0 None ('top') + (input 'a' 0.2:8) + (input 'b' 0.8:16) + (input 'en' 0.16) + (input 'a_clk' 0.17) + (input 'a_rst' 0.18) + (input 'b_clk' 0.19) + (input 'b_rst' 0.20) + (input 'c_clk' 0.21) + ) + (cell 0 0 (top + (input 'a' 2:8) + (input 'b' 8:16) + (input 'en' 16:17) + (input 'a_clk' 17:18) + (input 'a_rst' 18:19) + (input 'b_clk' 19:20) + (input 'b_rst' 20:21) + (input 'c_clk' 21:22) + )) + (cell 1 0 (matches 0.16 1)) + (cell 2 0 (priority_match 1 1.0)) + (cell 3 0 (assignment_list 1'd0 (2.0 0:1 1'd1))) + (cell 4 0 (print 3.0 ((u 0.2:8 '')))) + (cell 5 0 (assignment_list 1'd0 (2.0 0:1 1'd1))) + (cell 6 0 (print 5.0 ((s 0.8:16 '') '\\n'))) + (cell 7 0 (matches 0.16 1)) + (cell 8 0 (priority_match 1 7.0)) + (cell 9 0 (assignment_list 1'd0 (8.0 0:1 1'd1))) + (cell 10 0 (print 9.0 pos 0.17 ((u 0.2:8 '') ' ' (s 0.8:16 '') '\\n'))) + (cell 11 0 (matches 0.16 1)) + (cell 12 0 (priority_match 1 11.0)) + (cell 13 0 (assignment_list 1'd0 (12.0 0:1 1'd1))) + (cell 14 0 (print 13.0 pos 0.19 ('values: ' (u 0.2:8 '02x') ', ' (s 0.8:16 '+d') '\\n'))) + (cell 15 0 (matches 0.16 1)) + (cell 16 0 (priority_match 1 15.0)) + (cell 17 0 (assignment_list 1'd0 (16.0 0:1 1'd1))) + (cell 18 0 (print 17.0 neg 0.21 ('meow\\n'))) + ) + """) + def test_assert(self): m = Module() i = Signal(6) @@ -3154,11 +3217,11 @@ def test_assert(self): m.domains.c = ClockDomain(reset_less=True, clk_edge="neg") with m.If(i[5]): m.d.comb += Assert(i[0]) - m.d.comb += Assume(i[1], name="a") + m.d.comb += Assume(i[1], "aaa") m.d.a += Assert(i[2]) - m.d.b += Assume(i[3], name="b") - m.d.c += Cover(i[4], name="c") - m.d.comb += Cover(i, name="d") + m.d.b += Assume(i[3], Format("value: {}", i)) + m.d.c += Cover(i[4], "c") + m.d.comb += Cover(i, "d") nl = build_netlist(Fragment.get(m, None), [ i, ClockSignal("a"), ResetSignal("a"), @@ -3186,25 +3249,23 @@ def test_assert(self): (cell 1 0 (matches 0.7 1)) (cell 2 0 (priority_match 1 1.0)) (cell 3 0 (assignment_list 1'd0 (2.0 0:1 1'd1))) - (cell 4 0 (assert None 0.2 3.0)) + (cell 4 0 (assert 0.2 3.0 None)) (cell 5 0 (assignment_list 1'd0 (2.0 0:1 1'd1))) - (cell 6 0 (assume 'a' 0.3 5.0)) + (cell 6 0 (assume 0.3 5.0 ('aaa'))) (cell 7 0 (b 0.2:8)) (cell 8 0 (assignment_list 1'd0 (2.0 0:1 1'd1))) - (cell 9 0 (cover 'd' 7.0 8.0)) + (cell 9 0 (cover 7.0 8.0 ('d'))) (cell 10 0 (matches 0.7 1)) (cell 11 0 (priority_match 1 10.0)) (cell 12 0 (assignment_list 1'd0 (11.0 0:1 1'd1))) - (cell 13 0 (assert None 0.4 12.0 pos 0.8)) + (cell 13 0 (assert 0.4 12.0 pos 0.8 None)) (cell 14 0 (matches 0.7 1)) (cell 15 0 (priority_match 1 14.0)) (cell 16 0 (assignment_list 1'd0 (15.0 0:1 1'd1))) - (cell 17 0 (assume 'b' 0.5 16.0 pos 0.10)) + (cell 17 0 (assume 0.5 16.0 pos 0.10 ('value: ' (u 0.2:8 '')))) (cell 18 0 (matches 0.7 1)) (cell 19 0 (priority_match 1 18.0)) (cell 20 0 (assignment_list 1'd0 (19.0 0:1 1'd1))) - (cell 21 0 (cover 'c' 0.6 20.0 neg 0.12)) - - + (cell 21 0 (cover 0.6 20.0 neg 0.12 ('c'))) ) """) diff --git a/tests/test_lib_coding.py b/tests/test_lib_coding.py index 29ffcee7b..ca1da31c3 100644 --- a/tests/test_lib_coding.py +++ b/tests/test_lib_coding.py @@ -1,5 +1,4 @@ from amaranth.hdl import * -from amaranth.asserts import * from amaranth.sim import * from amaranth.lib.coding import * diff --git a/tests/test_lib_fifo.py b/tests/test_lib_fifo.py index 126d71f5a..6257c92ae 100644 --- a/tests/test_lib_fifo.py +++ b/tests/test_lib_fifo.py @@ -3,7 +3,7 @@ import warnings from amaranth.hdl import * -from amaranth.asserts import * +from amaranth.asserts import Initial, AnyConst from amaranth.sim import * from amaranth.lib.fifo import * from amaranth.lib.memory import * diff --git a/tests/test_sim.py b/tests/test_sim.py index 1afe63b50..bafc820f3 100644 --- a/tests/test_sim.py +++ b/tests/test_sim.py @@ -1,6 +1,8 @@ import os import warnings -from contextlib import contextmanager +from contextlib import contextmanager, redirect_stdout +from io import StringIO +from textwrap import dedent from amaranth._utils import flatten from amaranth.hdl._ast import * @@ -416,7 +418,7 @@ def test_rotate_right(self): class SimulatorIntegrationTestCase(FHDLTestCase): @contextmanager - def assertSimulation(self, module, deadline=None): + def assertSimulation(self, module, *, deadline=None): sim = Simulator(module) yield sim with sim.write_vcd("test.vcd", "test.gtkw"): @@ -1074,6 +1076,104 @@ def process(): self.assertEqual((yield o), 1) sim.add_testbench(process) + def test_print(self): + m = Module() + ctr = Signal(16) + m.d.sync += ctr.eq(ctr + 1) + with m.If(ctr % 3 == 0): + m.d.sync += Print(Format("Counter: {ctr:03d}", ctr=ctr)) + output = StringIO() + with redirect_stdout(output): + with self.assertSimulation(m) as sim: + sim.add_clock(1e-6, domain="sync") + def process(): + yield Delay(1e-5) + sim.add_testbench(process) + self.assertEqual(output.getvalue(), dedent("""\ + Counter: 000 + Counter: 003 + Counter: 006 + Counter: 009 + """)) + + def test_print(self): + def enc(s): + return Cat( + Const(b, 8) + for b in s.encode() + ) + + m = Module() + ctr = Signal(16) + m.d.sync += ctr.eq(ctr + 1) + msg = Signal(8 * 8) + with m.If(ctr == 0): + m.d.comb += msg.eq(enc("zero")) + with m.Else(): + m.d.comb += msg.eq(enc("non-zero")) + with m.If(ctr % 3 == 0): + m.d.sync += Print(Format("Counter: {:>8s}", msg)) + output = StringIO() + with redirect_stdout(output): + with self.assertSimulation(m) as sim: + sim.add_clock(1e-6, domain="sync") + def process(): + yield Delay(1e-5) + sim.add_testbench(process) + self.assertEqual(output.getvalue(), dedent("""\ + Counter: zero + Counter: non-zero + Counter: non-zero + Counter: non-zero + """)) + + def test_assert(self): + m = Module() + ctr = Signal(16) + m.d.sync += ctr.eq(ctr + 1) + m.d.sync += Assert(ctr < 4, Format("Counter too large: {}", ctr)) + with self.assertRaisesRegex(AssertionError, + r"^Assertion violated: Counter too large: 4$"): + with self.assertSimulation(m) as sim: + sim.add_clock(1e-6, domain="sync") + def process(): + yield Delay(1e-5) + sim.add_testbench(process) + + def test_assume(self): + m = Module() + ctr = Signal(16) + m.d.sync += ctr.eq(ctr + 1) + m.d.comb += Assume(ctr < 4) + with self.assertRaisesRegex(AssertionError, + r"^Assumption violated$"): + with self.assertSimulation(m) as sim: + sim.add_clock(1e-6, domain="sync") + def process(): + yield Delay(1e-5) + sim.add_testbench(process) + + def test_cover(self): + m = Module() + ctr = Signal(16) + m.d.sync += ctr.eq(ctr + 1) + cover = Cover(ctr % 3 == 0, Format("Counter: {ctr:03d}", ctr=ctr)) + m.d.sync += cover + m.d.sync += Cover(ctr % 3 == 1) + output = StringIO() + with redirect_stdout(output): + with self.assertSimulation(m) as sim: + sim.add_clock(1e-6, domain="sync") + def process(): + yield Delay(1e-5) + sim.add_testbench(process) + self.assertRegex(output.getvalue(), dedent(r""" + Coverage hit at .*test_sim\.py:\d+: Counter: 000 + Coverage hit at .*test_sim\.py:\d+: Counter: 003 + Coverage hit at .*test_sim\.py:\d+: Counter: 006 + Coverage hit at .*test_sim\.py:\d+: Counter: 009 + """).lstrip()) + class SimulatorRegressionTestCase(FHDLTestCase): def test_bug_325(self):