from amaranth import Signal, Module, unsigned from amaranth.lib.data import ArrayLayout, Struct, Union, View from amaranth.lib.enum import IntEnum from amaranth.lib.wiring import Signature, In, Out, Component, connect, \ flipped from .page_buffer import SeqSignature as PageBufSignature from .sequencer import SequencerSignature, EfbWishbone, Sequencer, Name, \ Operands, ConstantOp, SetUfmAddrData, DisableRefreshOp StreamerSignature = Signature({ **PageBufSignature.members, "stall": Out(1), # Unused, for compatibility with Verilog ports. # noqa: E501 "ready": In(1), # Unused, for compatibility with Verilog ports. # noqa: E501 }) class Streamer(Component): stream: In(StreamerSignature) efb: Out(EfbWishbone) def __init__(self): super().__init__() self.seqmod = Sequencer() self._seq = SequencerSignature.create() def elaborate(self, plat): m = Module() m.submodules.seqmod = self.seqmod ufm_busy = Signal(2) just_entered = Signal(1) connect(m, self.seqmod.ctl, self._seq) connect(m, flipped(self.efb), self.seqmod.efb) m.d.comb += self.stream.data.eq(self._seq.rd.data.stream) def next_state_if_asserted(stim, state): with m.If(stim): m.next = state def assert_if_just_entered(sig): m.d.comb += sig.eq(just_entered) def drive_sequencer_poll_status(): m.d.comb += [ self._seq.cmd.cmd.eq(Name.POLL_STATUS), self._seq.cmd.ops.eq(0), self._seq.op_len.eq(3), self._seq.wr.data.eq(0), self._seq.data_len.eq(4), self._seq.xfer_is_wr.eq(0) ] with m.FSM() as fsm: # noqa: F841 with m.State("IDLE"): m.d.comb += self.stream.ready.eq(1) m.d.comb += [ self._seq.cmd.cmd.eq(Name.IDLE), self._seq.cmd.ops.constant.eq(0), self._seq.op_len.eq(0), self._seq.wr.data.eq(0), self._seq.data_len.eq(0), self._seq.xfer_is_wr.eq(1) ] next_state_if_asserted(self.stream.stb, "ENABLE_CONFIG") with m.State("ENABLE_CONFIG"): assert_if_just_entered(self._seq.req) m.d.comb += [ self._seq.cmd.cmd.eq(Name.ENABLE_CONFIG), self._seq.cmd.ops.eq(0x080000), self._seq.op_len.eq(3), self._seq.wr.data.eq(0), self._seq.data_len.eq(0), self._seq.xfer_is_wr.eq(1) ] next_state_if_asserted(self._seq.done, "POLL_STATUS_1") # Bleh... easier to make each data strobe an individual state. with m.State("POLL_STATUS_1"): assert_if_just_entered(self._seq.req) drive_sequencer_poll_status() next_state_if_asserted(self._seq.rd.stb, "POLL_STATUS_2") with m.State("POLL_STATUS_2"): drive_sequencer_poll_status() next_state_if_asserted(self._seq.rd.stb, "POLL_STATUS_3") with m.State("POLL_STATUS_3"): drive_sequencer_poll_status() with m.If(self._seq.rd.stb): m.d.sync += ufm_busy.eq(self._seq.rd.data.status.busy) next_state_if_asserted(self._seq.rd.stb, "POLL_STATUS_4") with m.State("POLL_STATUS_4"): drive_sequencer_poll_status() next_state_if_asserted(self._seq.rd.stb, "POLL_STATUS_5") with m.State("POLL_STATUS_5"): drive_sequencer_poll_status() with m.If(self._seq.done): m.next = "SET_UFM_ADDR" next_state_if_asserted(ufm_busy, "POLL_STATUS_1") with m.State("SET_UFM_ADDR"): assert_if_just_entered(self._seq.req) m.d.comb += [ self._seq.cmd.cmd.eq(Name.SET_UFM_ADDR), self._seq.cmd.ops.eq(0), self._seq.op_len.eq(3), self._seq.wr.data.set_ufm_addr.pages.eq(self.stream.addr), self._seq.wr.data.set_ufm_addr.space.eq(1), self._seq.data_len.eq(4), self._seq.xfer_is_wr.eq(1) ] next_state_if_asserted(self._seq.done, "READ_UFM") with m.State("READ_UFM"): assert_if_just_entered(self._seq.req) m.d.comb += self.stream.ack.eq(self._seq.rd.stb) m.d.comb += [ self._seq.cmd.cmd.eq(Name.READ_UFM), self._seq.cmd.ops.read_ufm.pages.eq(1), self._seq.cmd.ops.read_ufm.port.eq(1), self._seq.op_len.eq(3), self._seq.wr.data.eq(0), self._seq.data_len.eq(16), self._seq.xfer_is_wr.eq(0) ] next_state_if_asserted(self._seq.done, "DISABLE_CONFIG") with m.State("DISABLE_CONFIG"): assert_if_just_entered(self._seq.req) m.d.comb += [ self._seq.cmd.cmd.eq(Name.DISABLE_CONFIG), self._seq.cmd.ops.disable.eq(0), self._seq.op_len.eq(2), self._seq.wr.data.eq(0), self._seq.data_len.eq(0), self._seq.xfer_is_wr.eq(1) ] next_state_if_asserted(self._seq.done, "BYPASS") with m.State("BYPASS"): assert_if_just_entered(self._seq.req) with m.If(self._seq.done & ~self.stream.stb): m.d.comb += self.stream.ready.eq(1) m.d.comb += [ self._seq.cmd.cmd.eq(Name.BYPASS), self._seq.cmd.ops.eq(0), self._seq.op_len.eq(0), self._seq.wr.data.eq(0), self._seq.data_len.eq(0), self._seq.xfer_is_wr.eq(1) ] with m.If(self._seq.done): m.next = "IDLE" next_state_if_asserted(self.stream.stb, "ENABLE_CONFIG") prev_state = Signal.like(fsm.state) m.d.sync += prev_state.eq(fsm.state) m.d.comb += just_entered.eq(prev_state != fsm.state) return m