From 4545bea1ea520050490f09f68885b717fff81657 Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 02:38:39 +0000 Subject: [PATCH 1/8] Add DFL experiment code --- recirq/dfl/__init__.py | 0 recirq/dfl/dfl_1d.py | 258 ++++ recirq/dfl/dfl_2d_second_order_trotter.py | 635 +++++++++ recirq/dfl/dfl_experiment.py | 1429 +++++++++++++++++++++ 4 files changed, 2322 insertions(+) create mode 100644 recirq/dfl/__init__.py create mode 100644 recirq/dfl/dfl_1d.py create mode 100644 recirq/dfl/dfl_2d_second_order_trotter.py create mode 100644 recirq/dfl/dfl_experiment.py diff --git a/recirq/dfl/__init__.py b/recirq/dfl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/recirq/dfl/dfl_1d.py b/recirq/dfl/dfl_1d.py new file mode 100644 index 00000000..1482718f --- /dev/null +++ b/recirq/dfl/dfl_1d.py @@ -0,0 +1,258 @@ +import os +import pickle +from collections.abc import Sequence +from typing import List + +import cirq +import matplotlib.pyplot as plt +import numpy as np +import numpy.typing as npt +from tqdm import tqdm + + +def trotter_circuit( + grid: Sequence[cirq.GridQubit], + n_cycles: int, + dt: float, + h: float, + mu: float, + two_qubit_gate="cz", +) -> cirq.Circuit: + if two_qubit_gate == "cz": + return _layer_floquet_cz(grid, dt, h, mu) * n_cycles + + elif two_qubit_gate == "cphase": + if n_cycles == 0: + return cirq.Circuit() + if n_cycles == 1: + return _layer_floquet_cphase_first( + grid, dt, h, mu + ) + _layer_floquet_cphase_last_missing_piece(grid, dt, h, mu) + else: + return ( + _layer_floquet_cphase_first(grid, dt, h, mu) + + (n_cycles - 1) * _layer_floquet_cphase_middle(grid, dt, h, mu) + + _layer_floquet_cphase_last_missing_piece(grid, dt, h, mu) + ) + else: + raise ValueError("Two-qubit gate can only be cz or cphase") + + +def _layer_floquet_cz( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + n = len(grid) + moment_rz = [] + moment_rx = [] + moment_h = [] + for i in range(n): + q = grid[i] + if i % 2 == 1: + moment_rz.append(cirq.rz(dt).on(q)) + moment_h.append(cirq.H(q)) + moment_rx.append(cirq.rx(2 * h * dt).on(q)) + else: + moment_rx.append(cirq.rx(2 * mu * dt).on(q)) + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + _change_basis(grid) + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + + _change_basis(grid) + + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + ) + + +def _layer_floquet_cphase_middle( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + n = len(grid) + moment_0 = [] + moment_1 = [] + moment_h = [] + moment_rz = [] + moment_rx = [] + + for i in range(n // 2): + q0 = grid[(2 * i)] + q = grid[2 * i + 1] + q1 = grid[(2 * i + 2) % n] + moment_0.append(cirq.CZ(q0, q)) # left to right + moment_1.append(cirq.cphase(-4 * dt).on(q1, q)) # right to left + + moment_h.append(cirq.H(q)) + + moment_rz.append(cirq.rz(2 * dt).on(q)) + moment_rz.append(cirq.rz(2 * dt).on(q1)) + + for i in range(n): + q = grid[i] + if i % 2 == 1: + moment_rx.append(cirq.rx(2 * h * dt).on(q)) + else: + moment_rx.append(cirq.rx(2 * mu * dt).on(q)) + + moment_rz_new = [] + qubits_covered = [] + for gate in moment_rz: + count = moment_rz.count(gate) + qubit = gate.qubits[0] + if qubit not in qubits_covered: + moment_rz_new.append(cirq.rz(2 * count * dt).on(qubit)) + qubits_covered.append(qubit) + + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_0), + cirq.Moment(moment_h), + cirq.Moment(moment_rz_new), + cirq.Moment(moment_1), + cirq.Moment(moment_h), + cirq.Moment(moment_0), + cirq.Moment(moment_h), + cirq.Moment(moment_rx), + ) + + +def _layer_floquet_cphase_first( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + n = len(grid) + moment_rz = [] + moment_rx = [] + + for i in range(n): + q = grid[i] + if i % 2 == 1: + moment_rz.append(cirq.rz(dt).on(q)) + moment_rx.append(cirq.rx(2 * h * dt).on(q)) + else: + moment_rx.append(cirq.rx(2 * mu * dt).on(q)) + + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + _change_basis(grid) + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + ) + + +def _layer_floquet_cphase_last_missing_piece( + grid: Sequence[cirq.GridQubit], dt: float, h: float, mu: float +) -> cirq.Circuit: + n = len(grid) + moment_rz = [] + for i in range(n): + q = grid[i] + if i % 2 == 1: + moment_rz.append(cirq.rz(dt).on(q)) + + return _change_basis(grid) + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + +def _layer_hadamard(grid: Sequence[cirq.GridQubit], which_qubits="all") -> cirq.Circuit: + moment = [] + n = len(grid) + for i in range(n): + q = grid[i] + + if i % 2 == 1 and (which_qubits == "gauge" or which_qubits == "all"): + moment.append(cirq.H(q)) + + elif i % 2 == 0 and (which_qubits == "matter" or which_qubits == "all"): + moment.append(cirq.H(q)) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + +def _layer_measure(grid: Sequence[cirq.GridQubit]) -> cirq.Circuit: + n = len(grid) + moment = [] + for i in range(n // 2): + moment.append(cirq.measure(grid[2 * i], key="m")) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + +def _change_basis(grid: Sequence[cirq.GridQubit]) -> cirq.Circuit: + """change the basis so that the matter sites encode the gauge charge.""" + n = len(grid) + moment_1 = [] + moment_2 = [] + moment_h = [] + for i in range(0, n // 2): + q1 = grid[(2 * i)] + q2 = grid[2 * i + 1] + q3 = grid[(2 * i + 2) % n] + moment_1.append(cirq.CZ(q1, q2)) + moment_2.append(cirq.CZ(q3, q2)) + moment_h.append(cirq.H(q2)) + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), cirq.Moment(moment_1), cirq.Moment(moment_2), cirq.Moment(moment_h) + ) + + +def _energy_bump_initial_state( + grid, h: float, matter_config: str, excited_qubits: Sequence[cirq.GridQubit] +) -> cirq.Circuit: + """Circuit for energy bump initial state. + It typically consists of single qubit gates and the basis change circuit U_B. + But in this second order implementation, I am removing U_B since + it cancels out with the U_B in the second order trotter circuit.""" + n = len(grid) + theta = np.arctan(h) + moment = [] + for i in range(n): + q = grid[i] + if i % 2 == 1: + if q in excited_qubits: + moment.append(cirq.ry(np.pi + theta).on(q)) + else: + moment.append(cirq.ry(theta).on(q)) + + else: + if matter_config == "single_sector": + moment.append(cirq.H(q)) + return cirq.Circuit.from_moments(moment) + + +def get_1d_dfl_experiment_circuits( + grid: Sequence[cirq.GridQubit], + initial_state: str, + n_cycles: Sequence[int] | npt.NDArray, + excited_qubits: Sequence[cirq.GridQubit], + dt: float, + h: float, + mu: float, + n_instances: int = 10, + two_qubit_gate: str = "cz", + basis="dual", +): + if initial_state == "single_sector": + initial_circuit = _energy_bump_initial_state(grid, h, "single_sector", excited_qubits) + elif initial_state == "superposition": + initial_circuit = _energy_bump_initial_state(grid, h, "superposition", excited_qubits) + else: + raise ValueError("Invalid initial state") + circuits = [] + for n_cycle in tqdm(n_cycles): + print(int(np.max([0, n_cycle - 1]))) + circ = initial_circuit + trotter_circuit(grid, n_cycle, dt, h, mu, two_qubit_gate) + if basis == "lgt": + circ += _change_basis(grid) + elif basis == "dual": + pass + else: + raise ValueError("Invalid option for basis") + for _ in range(n_instances): + + if basis == "lgt": + circ_z = circ + cirq.measure([q for q in grid], key="m") + elif basis == "dual": + circ_z = ( + circ + + _layer_hadamard(grid, "matter") + + cirq.measure([q for q in grid], key="m") + ) + else: + raise ValueError("Invalid option for basis") + circ_x = circ + _layer_hadamard(grid, "all") + cirq.measure([q for q in grid], key="m") + circuits.append(circ_z) + circuits.append(circ_x) + return circuits diff --git a/recirq/dfl/dfl_2d_second_order_trotter.py b/recirq/dfl/dfl_2d_second_order_trotter.py new file mode 100644 index 00000000..0239db70 --- /dev/null +++ b/recirq/dfl/dfl_2d_second_order_trotter.py @@ -0,0 +1,635 @@ +import os +import pickle +from typing import cast, List, Sequence, Set, Tuple + +import cirq +import numpy as np +import numpy.typing as npt +from tqdm import tqdm + + +class LGTDFL: + """ + A class for simulating Disorder-free localization (DFL) on a 2-dimensional Lattice Gauge + Theory (LGT) with Second order trotter dynamics. + + Attributes: + qubit_grid: A list of cirq.GridQubit objects representing the 2D grid. + origin: Top right matter qubit + dt: The time step for the simulation. + h: The gauge field strength. + mu: The matter field strength. + matter_qubits: A sorted list of cirq.GridQubit objects representing matter qubits. + gauge_qubits: A sorted list of cirq.GridQubit objects representing gauge qubits. + all_qubits: A sorted list of all qubits (matter + gauge). + """ + + def __init__( + self, + qubit_grid: Sequence[cirq.GridQubit], + origin: cirq.GridQubit, + dt: float, + h: float, + mu: float, + ): + """Initializes the LGTDFL class. + + Args: + qubit_grid: A list of cirq.GridQubit objects representing the 2D grid. + origin: The starting cirq.GridQubit object for grid generation. + dt: The time step for the simulation. + h: The gauge field strength. + mu: The matter field strength. + """ + self.qubit_grid = qubit_grid + self.origin = origin + self.dt = dt + self.h = h + self.mu = mu + self.matter_qubits, self.gauge_qubits = self.create_lgt_grid() + self.all_qubits = sorted(self.matter_qubits + self.gauge_qubits) + + def get_y_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: + """Returns horizontal neighbors. If matter qubit is provided return gauge qubits and vice + versa.""" + if qubit in self.gauge_qubits: + return [ + q + for q in [ + cirq.GridQubit(qubit.row + 1, qubit.col), + cirq.GridQubit(qubit.row - 1, qubit.col), + ] + if q in self.matter_qubits + ] + else: + return [ + q + for q in [ + cirq.GridQubit(qubit.row + 1, qubit.col), + cirq.GridQubit(qubit.row - 1, qubit.col), + ] + if q in self.gauge_qubits + ] + + def get_x_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: + """Returns vertical neighbors. If matter qubit is provided return gauge qubits and vice + versa.""" + if qubit in self.gauge_qubits: + return [ + q + for q in [ + cirq.GridQubit(qubit.row, qubit.col + 1), + cirq.GridQubit(qubit.row, qubit.col - 1), + ] + if q in self.matter_qubits + ] + else: + return [ + q + for q in [ + cirq.GridQubit(qubit.row, qubit.col + 1), + cirq.GridQubit(qubit.row, qubit.col - 1), + ] + if q in self.gauge_qubits + ] + + def create_lgt_grid(self) -> Tuple[Tuple[cirq.GridQubit, ...], Tuple[cirq.GridQubit, ...]]: + """Creates a lattice gauge theory (LGT) grid with simplified logic. + + Args: + qubit_grid: A list of cirq.GridQubit objects. + origin: The starting cirq.GridQubit object. + + Returns: + A tuple containing two lists: (matter_qubits, gauge_qubits). + """ + + qubit_set = self.qubit_grid + matter_qubits = [self.origin] + gauge_qubits = [] + queue = [self.origin] + + while queue: + current_matter = queue.pop(0) + neighbors = current_matter.neighbors() + + for neighbor in neighbors: + if neighbor in qubit_set and neighbor not in gauge_qubits: + gauge_qubits.append(neighbor) + + # Find matter neighbors for gauge qubits + if neighbor.row == current_matter.row: # Horizontal gauge + horizontal_neighbor = cirq.GridQubit( + neighbor.row, neighbor.col + (neighbor.col - current_matter.col) + ) + if ( + horizontal_neighbor in self.qubit_grid + and horizontal_neighbor not in matter_qubits + ): + matter_qubits.append(horizontal_neighbor) + queue.append(horizontal_neighbor) + else: # Vertical gauge + vertical_neighbor = cirq.GridQubit( + neighbor.row + (neighbor.row - current_matter.row), neighbor.col + ) + if ( + vertical_neighbor in self.qubit_grid + and vertical_neighbor not in matter_qubits + ): + matter_qubits.append(vertical_neighbor) + queue.append(vertical_neighbor) + + return tuple(sorted(matter_qubits)), tuple(sorted(gauge_qubits)) # type: ignore + + def draw_lgt_grid(self): + """Draw the LGT grid.""" + d = {q: 1 for q in self.gauge_qubits} + d.update({q: 0 for q in self.matter_qubits}) + cirq.Heatmap(d).plot() + + def _layer_matter_gauge_x(self) -> cirq.Circuit: + """Single qubit rotations i.e., the mu and h terms.""" + moment = [] + for q in self.matter_qubits: + moment.append(cirq.rx(2 * self.mu * self.dt).on(q)) + for q in self.gauge_qubits: + moment.append(cirq.rx(2 * self.h * self.dt).on(q)) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + def layer_hadamard(self, which_qubits="all") -> cirq.Circuit: + moment = [] + if which_qubits == "gauge": + for q in self.gauge_qubits: + moment.append(cirq.H(q)) + elif which_qubits == "matter": + for q in self.matter_qubits: + moment.append(cirq.H(q)) + elif which_qubits == "all": + for q in self.all_qubits: + moment.append(cirq.H(q)) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + def layer_measure(self) -> cirq.Circuit: + moment = [] + for q in self.all_qubits: + moment.append(cirq.measure(q, key="m")) + return cirq.Circuit.from_moments(cirq.Moment(moment)) + + def trotter_circuit(self, n_cycles, two_qubit_gate="cz_simultaneous"): + if two_qubit_gate == "cz_simultaneous": + return self._layer_floquet_cz_simultaneous() * n_cycles + + elif two_qubit_gate == "cphase_simultaneous": + if n_cycles == 0: + return cirq.Circuit() + if n_cycles == 1: + return ( + self._layer_floquet_cphase_simultaneous_first() + + self._layer_floquet_cphase_simultaneous_last_missing_piece() + ) + else: + return ( + self._layer_floquet_cphase_simultaneous_first() + + (n_cycles - 1) * self._layer_floquet_cphase_simultaneous_middle() + + self._layer_floquet_cphase_simultaneous_last_missing_piece() + ) + + def layer_floquet(self, two_qubit_gate="cz_simultaneous", layer="middle") -> cirq.Circuit: + if two_qubit_gate == "cz_simultaneous": + return self._layer_floquet_cz_simultaneous() + + elif two_qubit_gate == "cphase_simultaneous": + if layer == "middle": + return self._layer_floquet_cphase_simultaneous_middle() + elif layer == "last": + return self._layer_floquet_cphase_simultaneous_last_missing_piece() + elif layer == "first": + return self._layer_floquet_cphase_simultaneous_first() + else: + raise ValueError("Invalid layer option") + else: + raise ValueError("Invalid two_qubit_gate option") + + def _energy_bump_initial_state( + self, + matter_config: str, + excited_qubits: Sequence[cirq.GridQubit], + two_qubit_gate="cz_simultaneous", + ) -> cirq.Circuit: + """Circuit for energy bump initial state. + It typically consists of single qubit gates and the basis change circuit U_B. + But in this second order implementation, I am removing U_B since + it cancels out with the U_B in the second order trotter circuit.""" + theta = np.arctan(self.h) + moment = [] + for q in self.gauge_qubits: + if q in excited_qubits: + moment.append(cirq.ry(np.pi + theta).on(q)) + else: + moment.append(cirq.ry(theta).on(q)) + + for q in self.matter_qubits: + if matter_config == "single_sector": + moment.append(cirq.H(q)) + + return cirq.Circuit.from_moments(moment) + + def _layer_floquet_cz_simultaneous(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian + Second order Trotter U(t) = e^(-itA/2) e^(-itB) e^(-iA/2). + We take A = zZz term and B = Rx terms, so the trotter layer + should look like UB Rz(t/2) UB Rx(t) UB Rz(t/2) UB. + However, UB from previous layer cancels out with UB in the + following layer so the trotter layer now looks like + ....Rz(t/2) UB Rx(t) UB Rz(t/2).... + """ + + moment_rz = [] + moment_rx = [] + moment_h = [] + for q in self.gauge_qubits: + moment_rz.append(cirq.rz(self.dt).on(q)) + moment_h.append(cirq.H(q)) + moment_rx.append(cirq.rx(2 * self.h * self.dt).on(q)) + + for q in self.matter_qubits: + moment_rx.append(cirq.rx(2 * self.mu * self.dt).on(q)) + + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + self._change_basis() + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + + self._change_basis() + + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + ) + + def _layer_floquet_cphase_simultaneous_middle(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian in terms of CPhase + Second order Trotter U(t) = e^(-itA/2) e^(-itB) e^(-iA/2). + After cancelling U_B's the circuit for the trotter layer looks like + Rz(t/2) UB Rx(t) UB Rz(t/2). However we can condense UB Rz(t) UB of two + consecutive layers in terms of Cphase gates bringing 8 CZ gates down to + 4 CZ and 2 CPhase gates. Let's take a repeating layer in the middle of the form + ...UB Rz(t/2) Rz(t/2) UB Rx(t)..... = ...UB Rz(t) UB Rx(t)... + Thus, the first layer should look like Rz(t/2)UB Rx(t)..... + and the last layer is missing ....UB Rz(T/2) + """ + + moment_0x = [] + moment_1x = [] + + moment_0y = [] + moment_1y = [] + + moment_h = [] + + moment_rz1 = [] + moment_rz2 = [] + + moment_rx = [] + + for q in self.gauge_qubits: + nbrs = self.get_x_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0x.append(cirq.CZ(q0, q)) # left to right + moment_1x.append(cirq.cphase(-4 * self.dt).on(q1, q)) # right to left + + moment_h.append(cirq.H(q)) + + moment_rz1.append(cirq.rz(2 * self.dt).on(q)) + moment_rz1.append(cirq.rz(2 * self.dt).on(q1)) + + for q in self.gauge_qubits: + nbrs = self.get_y_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0y.append(cirq.CZ(q0, q)) # top to bottom + moment_1y.append(cirq.cphase(-4 * self.dt).on(q1, q)) # bottom to top + + moment_h.append(cirq.H(q)) + + moment_rz2.append(cirq.rz(2 * self.dt).on(q)) + moment_rz2.append(cirq.rz(2 * self.dt).on(q1)) + + for q in self.gauge_qubits: + moment_rx.append(cirq.rx(2 * self.h * self.dt).on(q)) + + for q in self.matter_qubits: + moment_rx.append(cirq.rx(2 * self.mu * self.dt).on(q)) + + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_0x), + cirq.Moment(moment_0y), + cirq.Moment(moment_h), + cirq.Moment(moment_1x), + cirq.Moment(moment_rz1), + cirq.Moment(moment_1y), + cirq.Moment(moment_rz2), + cirq.Moment(moment_h), + cirq.Moment(moment_0x), + cirq.Moment(moment_0y), + cirq.Moment(moment_h), + cirq.Moment(moment_rx), + ) + + def _layer_floquet_cphase_simultaneous_first(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian in terms of CPhase. + After cancelling U_B's the circuit for the middle trotter layer looks like + ...Rz(t/2) UB Rx(t) UB Rz(t/2).... However we can condense UB Rz(t) UB of two + consecutive layers in terms of Cphase gates bringing 8 CZ gates down to + 4 CZ and 2 CPhase gates. A layer in the middle then takes the form + ...UB Rz(t/2) Rz(t/2) UB Rx(t)..... = ...UB Rz(t) UB Rx(t)... + + However,the first floquet layer for the cphase implementation looks like + Rz(t/2)UB Rx(t).... Here, we will use the CZ implementation of UB. + """ + + moment_rz = [] + moment_rx = [] + + for q in self.gauge_qubits: + moment_rz.append(cirq.rz(self.dt).on(q)) + moment_rx.append(cirq.rx(2 * self.h * self.dt).on(q)) + + for q in self.matter_qubits: + moment_rx.append(cirq.rx(2 * self.mu * self.dt).on(q)) + + return ( + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + self._change_basis() + + cirq.Circuit.from_moments(cirq.Moment(moment_rx)) + ) + + def _layer_floquet_cphase_simultaneous_last_missing_piece(self) -> cirq.Circuit: + """Circuit for a trotter step of the DFL Hamiltonian in terms of CPhase. + After cancelling U_B's the circuit for the trotter layer looks like + Rz(t/2) UB Rx(t) UB Rz(t/2). However we can condense UB Rz(t) UB of two + consecutive layers in terms of Cphase gates bringing 8 CZ gates down to + 4 CZ and 2 CPhase gates. A layer in the middle then takes the form + ...UB Rz(t/2) Rz(t/2) UB Rx(t)..... = ...UB Rz(t) UB Rx(t)... + + However,the last floquet layer for the cphase implementation looks like + ......UB Rz(t/2) Here, we will use the CZ implementation of UB. + """ + moment_rz = [] + for q in self.gauge_qubits: + moment_rz.append(cirq.rz(self.dt).on(q)) + + return self._change_basis() + cirq.Circuit.from_moments(cirq.Moment(moment_rz)) + + def _change_basis(self) -> cirq.Circuit: + """Transform the LGT basis to the dual basis.""" + moment_0x = [] + moment_1x = [] + + moment_0y = [] + moment_1y = [] + + moment_h = [] + + for q in self.gauge_qubits: + nbrs = self.get_x_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0x.append(cirq.CZ(q0, q)) + moment_1x.append(cirq.CZ(q1, q)) + moment_h.append(cirq.H(q)) + + for q in self.gauge_qubits: + nbrs = self.get_y_neighbors(q) + if len(nbrs) == 2: + q0, q1 = nbrs[0], nbrs[1] + moment_0y.append(cirq.CZ(q0, q)) + moment_1y.append(cirq.CZ(q1, q)) + moment_h.append(cirq.H(q)) + + return cirq.Circuit.from_moments( + cirq.Moment(moment_h), + cirq.Moment(moment_0x), + cirq.Moment(moment_1x), + cirq.Moment(moment_0y), + cirq.Moment(moment_1y), + cirq.Moment(moment_h), + ) + + def _interaction_indices(self) -> Sequence[Tuple[int, int, int]]: + indices = [] + for q in self.gauge_qubits: + nbrs = self.get_x_neighbors(q) + if nbrs: + q0, q1 = nbrs[0], nbrs[1] + i0, i, i1 = ( + self.all_qubits.index(q0), + self.all_qubits.index(q), + self.all_qubits.index(q1), + ) + indices.append((i0, i, i1)) + + nbrs = self.get_y_neighbors(q) + if nbrs: + q0, q1 = nbrs[0], nbrs[1] + i0, i, i1 = ( + self.all_qubits.index(q0), + self.all_qubits.index(q), + self.all_qubits.index(q1), + ) + indices.append((i0, i, i1)) + return indices + + def _matter_indices(self) -> Sequence[int]: + indices = [] + for i, q in enumerate(self.all_qubits): + if q in self.matter_qubits: + indices.append(i) + return indices + + def _gauge_indices(self) -> Sequence[int]: + indices = [] + for i, q in enumerate(self.all_qubits): + if q in self.gauge_qubits: + indices.append(i) + return indices + + def _compute_observables_one_instance_dual_basis( + self, bits_z: npt.NDArray[np.int8], bits_x: npt.NDArray[np.int8] + ) -> Sequence[np.ndarray]: + bits_x_rescaled = 1 - 2 * bits_x + bits_z_rescaled = 1 - 2 * bits_z + + gauge_inds = [self.all_qubits.index(q) for q in self.gauge_qubits] + + matter_inds = [self.all_qubits.index(q) for q in self.matter_qubits] + gauge_inds = [self.all_qubits.index(q) for q in self.gauge_qubits] + + exp_gauge_x = np.mean(bits_x_rescaled[:, gauge_inds], axis=0) + var_gauge_x = np.var(bits_x_rescaled[:, gauge_inds], axis=0) + + exp_interaction = np.mean(bits_z_rescaled[:, gauge_inds], axis=0) + var_interaction = np.mean(bits_z_rescaled[:, gauge_inds], axis=0) + + matter_x_terms = [] + # in the dual basis the matter x takes the form + # \sigma^x_j -> \sigma^x_j \prod_{k \in N(j)} X_{jk} + + for q in self.matter_qubits: + prod_x = bits_x_rescaled[:, self.all_qubits.index(q)] + q_nbrs = [q_n for q_n in q.neighbors() if q_n in self.all_qubits] + for q_n in q_nbrs: + prod_x *= bits_x_rescaled[:, self.all_qubits.index(cast(cirq.GridQubit, q_n))] + matter_x_terms.append(prod_x) + + exp_matter_x = np.mean(matter_x_terms, axis=1) + var_matter_x = np.var(matter_x_terms, axis=1) + + exp_energy = [] + var_energy = [] + for _, idx in enumerate(self._interaction_indices()): + q0 = self.all_qubits[idx[0]] + q1 = self.all_qubits[idx[2]] + + w0 = 1 / len([n for n in q0.neighbors() if n in self.gauge_qubits]) + w1 = 1 / len([n for n in q1.neighbors() if n in self.gauge_qubits]) + + mat_1_index = self.matter_qubits.index(self.all_qubits[idx[0]]) + gauge_index = self.gauge_qubits.index(self.all_qubits[idx[1]]) + mat_2_index = self.matter_qubits.index(self.all_qubits[idx[2]]) + + em = ( + exp_interaction[gauge_index] + + self.h * exp_gauge_x[gauge_index] + + self.mu * (w0 * exp_matter_x[mat_1_index] + w1 * exp_matter_x[mat_2_index]) + ) + ev = ( + var_interaction[gauge_index] + + self.h * var_gauge_x[gauge_index] + + self.mu * (w0 * var_matter_x[mat_1_index] + w1 * var_matter_x[mat_2_index]) + ) + exp_energy.append(em) + var_energy.append(ev) + + return [ + np.stack([exp_matter_x, var_matter_x], axis=-1), + np.stack([exp_gauge_x, var_gauge_x], axis=-1), + np.stack([exp_interaction, var_interaction], axis=-1), + np.stack([exp_energy, var_energy], axis=-1), + ] + + def compute_observables_dual_basis( + self, bits_z: Sequence[npt.NDArray[np.int8]], bits_x: Sequence[npt.NDArray[np.int8]] + ) -> Sequence[np.ndarray]: + num_instances = len(bits_z) + if num_instances == 1: + return self._compute_observables_one_instance_dual_basis(bits_z[0], bits_x[0]) + else: + # compute mean and variance of the mean over randomized instances + matter_x = [] + gauge_x = [] + interaction = [] + energy = [] + for ins in range(num_instances): + mx, gx, intr, e = self._compute_observables_one_instance_dual_basis( + bits_z[ins], bits_x[ins] + ) + matter_x.append(mx[:, 0]) + gauge_x.append(gx[:, 0]) + interaction.append(intr[:, 0]) + energy.append(e[:, 0]) + + return [ + np.stack( + [ + np.mean(np.array(matter_x), axis=0), + np.var(np.array(matter_x), axis=0) / num_instances, + ], + axis=-1, + ), + np.stack( + [ + np.mean(np.array(gauge_x), axis=0), + np.var(np.array(gauge_x), axis=0) / num_instances, + ], + axis=-1, + ), + np.stack( + [ + np.mean(np.array(interaction), axis=0), + np.var(np.array(interaction), axis=0) / num_instances, + ], + axis=-1, + ), + np.stack( + [ + np.mean(np.array(energy), axis=0), + np.var(np.array(energy), axis=0) / num_instances, + ], + axis=-1, + ), + ] + + def _postselect_on_charge_one_instance_dual_basis( + self, bits: npt.NDArray[np.int8] + ) -> npt.NDArray[np.int8]: + q_measured = [] + for q in self.matter_qubits: + q_measured.append(1 - 2 * bits[self.all_qubits.index(q)]) + q_measured_array = np.array(q_measured) + q_measured_array = np.transpose(q_measured_array) + charges = np.repeat(1, q_measured_array.shape[0]) + selected = np.all(q_measured_array == charges, axis=0) + return bits[selected] + + def postselect_on_charge_dual_basis(self, bits: npt.NDArray[np.int8]) -> np.ndarray: + num_instances = bits.shape[0] + bits_ps = [] + for ins in range(num_instances): + bits_ps_i = self._postselect_on_charge_one_instance_dual_basis(bits[ins]) + if len(bits_ps_i) > 0: + bits_ps.append(bits_ps_i) + return np.array(bits_ps) + + def get_2d_dfl_experiment_circuits( + self, + initial_state: str, + n_cycles: Sequence[int] | npt.NDArray, + excited_qubits: Sequence[cirq.GridQubit], + n_instances: int = 10, + two_qubit_gate: str = "cz_simultaneous", + basis="dual", + ): + if initial_state == "single_sector": + initial_circuit = self._energy_bump_initial_state("single_sector", excited_qubits) + elif initial_state == "superposition": + initial_circuit = self._energy_bump_initial_state("superposition", excited_qubits) + else: + raise ValueError("Invalid initial state") + circuits = [] + for n_cycle in tqdm(n_cycles): + print(int(np.max([0, n_cycle - 1]))) + circ = initial_circuit + self.trotter_circuit(n_cycle, two_qubit_gate) + if basis == "lgt": + circ += self._change_basis() + elif basis == "dual": + pass + else: + raise ValueError("Invalid option for basis") + for _ in range(n_instances): + if basis == "lgt": + circ_z = circ + cirq.measure([q for q in self.all_qubits], key="m") + elif basis == "dual": + circ_z = ( + circ + + self.layer_hadamard("matter") + + cirq.measure([q for q in self.all_qubits], key="m") + ) + else: + raise ValueError("Invalid option for basis") + circ_x = ( + circ + + self.layer_hadamard("all") + + cirq.measure([q for q in self.all_qubits], key="m") + ) + circuits.append(circ_z) + circuits.append(circ_x) + return circuits diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py new file mode 100644 index 00000000..ded5c034 --- /dev/null +++ b/recirq/dfl/dfl_experiment.py @@ -0,0 +1,1429 @@ +import os +import pickle +from copy import deepcopy +from functools import partial +from typing import cast, Sequence + +import cirq +import cirq.transformers.dynamical_decoupling as dd +import numpy as np +from tqdm import tqdm + +import concurrent.futures +from . import dfl_1d as dfl +from . import dfl_2d_second_order_trotter as dfl_2d + + +def _apply_gauge_compiling(seed: int, circuit: cirq.Circuit) -> cirq.Circuit: + return cirq.merge_single_qubit_moments_to_phxz( + cirq.transformers.gauge_compiling.CPhaseGaugeTransformerMM()(circuit, rng_or_seed=seed) + ) + + +def _distance(q1: cirq.GridQubit, q2: cirq.GridQubit) -> int: + """Return the Manhattan distance between two qubits. + + Args: + q1: The first qubit. + q2: The second qubit. + + Returns: + The distance between the qubits. + """ + return abs(q1.row - q2.row) + abs(q1.col - q2.col) + + +class DFLExperiment: + """A class for performing the 1D DFL experiment (Fig 1 of the paper). + + Attrs: + qubits: The qubits to use for the experiment. + sampler: The cirq sampler to use. + j: The coefficient on the hopping term. + h: The coefficient on the gauge X term. + mu: The coefficient on the matter sigma_x term. + tau: The Trotter step size. + pbc: Whether to use periodic boundary conditions. + rng: The pseudorandom number generator to use for readout benchmarking. + cycles: The number of cycles for which to run the experiment. + num_gc_instances: The number of gauge compiling instances to use. + include_zero_trotter: Whether to include circuits with the Trotter step set to 0. + """ + + def __init__( + self, + qubits: list[cirq.GridQubit], + sampler: cirq.Sampler, + save_directory: str, + j: float = 1.0, + h: float = 1.3, + mu: float = 1.5, + tau: float = 0.25, + pbc: bool = True, + rng: np.random.Generator = np.random.default_rng(), + cycles: np.ndarray = np.arange(31), + num_gc_instances: int = 40, + use_cphase: bool = False, + include_zero_trotter: bool = True, + ): + self.qubits = qubits + self.sampler = sampler + self.j = j + self.h = h + self.mu = mu + self.tau = tau + assert len(qubits) % 2 == 0 + self.pbc = pbc + self.rng = rng + self.cycles = cycles + self.all_circuits: list[cirq.Circuit] = [] + self.e0 = np.array([]) + self.e1 = np.array([]) + self.num_gc_instances = num_gc_instances + self.executor = concurrent.futures.ProcessPoolExecutor() + self.use_cphase = use_cphase + self.readout_ideal_bitstrs = np.array([]) + self.save_directory = save_directory + self.include_zero_trotter = include_zero_trotter + self.matter_sites = np.arange(0, len(qubits), 2) + self.gauge_sites = np.arange(1, len(qubits), 2) + + def save_to_file(self, filename: str): + d_to_save = { + "measurements": self.measurements, + "readout_ideal_bitstrs": self.readout_ideal_bitstrs, + "j": self.j, + "h": self.h, + "mu": self.mu, + "tau": self.tau, + "pbc": self.pbc, + "cycles": self.cycles, + "num_gc_instances": self.num_gc_instances, + "use_cphase": self.use_cphase, + "e0": self.e0, + "e1": self.e1, + "qubits": self.qubits, + "save_directory": self.save_directory, + "include_zero_trotter": self.include_zero_trotter, + } + + pickle.dump(d_to_save, open(filename, "wb")) + + def load_from_file(self, filename: str): + d = pickle.load(open(filename, "rb")) + self.measurements = d["measurements"] + self.readout_ideal_bitstrs = d["readout_ideal_bitstrs"] + self.j = d["j"] + self.h = d["h"] + self.mu = d["mu"] + self.tau = d["tau"] + self.pbc = d["pbc"] + self.cycles = d["cycles"] + self.num_gc_instances = d["num_gc_instances"] + self.use_cphase = d["use_cphase"] + self.e0 = d["e0"] + self.e1 = d["e1"] + self.qubits = d["qubits"] + self.save_directory = d.get("save_directory", "temp") + self.include_zero_trotter = d.get("include_zero_trotter", True) + + def generate_circuit_instances( + self, + initial_state: str, + ncycles: int, + basis: str, + random_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + zero_trotter: bool = False, + ) -> list[cirq.Circuit]: + if initial_state == "gauge_invariant": + initial_state = "single_sector" + + assert initial_state in ["single_sector", "superposition"] + + circuit = dfl.get_1d_dfl_experiment_circuits( + self.qubits, + initial_state, + [ncycles], + [self.qubits[1]], # put the excitation on the first gauge site + 0.0 if zero_trotter else self.tau, + self.h, + self.mu, + 1, + "cphase" if self.use_cphase else "cz", + "dual", + )[["z", "x"].index(basis)] + + if random_compile: + circuits = list( + self.executor.map(partial(_apply_gauge_compiling, circuit=circuit), range(self.num_gc_instances)) + ) + else: + circuits = [circuit] + + if dynamical_decouple: + circuits_dd = self.executor.map( + partial(dd.add_dynamical_decoupling, schema=dd_sequence), circuits + ) + circuits = list(circuits_dd) + + return circuits + + def create_readout_benchmark_circuits( + self, num_random_bitstrings: int = 30 + ) -> tuple[np.ndarray, list[cirq.Circuit]]: + n = len(self.qubits) + x_or_I = lambda bit: cirq.X if bit == 1 else cirq.I + bitstrs = self.rng.integers(0, 2, size=(num_random_bitstrings, n)) + random_circuits = [] + random_circuits = [ + cirq.Circuit( + [x_or_I(bit)(qubit) for bit, qubit in zip(bitstr, self.qubits[: len(bitstr)])] + + [cirq.M(*self.qubits[: len(bitstr)], key="m")] + ) + for bitstr in bitstrs + ] + return bitstrs, random_circuits + + def identify_ideal_readout_bitstrings_from_circuits(self): + # first identify the number of readout bitstrings: + for num_bitstrs, circuit in enumerate(self.all_circuits[::-1]): + if len(circuit) > 2: + break + + bit_fn = lambda gate: 1 if gate == cirq.X else 0 + self.readout_ideal_bitstrs = np.array([ + [bit_fn(circuit[0][qubit].gate) for qubit in self.qubits] + for circuit in self.all_circuits[-num_bitstrs:] + ]) + + def generate_all_circuits( + self, + random_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + num_random_bitstrings: int = 30, + ): + if not random_compile: + self.num_gc_instances = 1 + num_gc_instances = self.num_gc_instances + + zero_trotter_options = [False, True] if self.include_zero_trotter else [False] + + all_circuits = [] + with tqdm( + total=2 * 2 * len(zero_trotter_options) * len(self.cycles) * num_gc_instances + ) as pbar: + for initial_state in ["gauge_invariant", "superposition"]: + for basis in ["z", "x"]: + for zero_trotter in zero_trotter_options: + for ncycles in self.cycles: + all_circuits += self.generate_circuit_instances( + initial_state, + ncycles, + basis, + random_compile, + dynamical_decouple, + dd_sequence, + zero_trotter, + ) + pbar.update(num_gc_instances) + + bitstrs, readout_circuits = self.create_readout_benchmark_circuits(num_random_bitstrings) + all_circuits += readout_circuits + + expected_number = ( + len(self.cycles) * 2 * 2 * len(zero_trotter_options) * num_gc_instances + + num_random_bitstrings + ) + assert len(all_circuits) == expected_number + + self.all_circuits = all_circuits + self.readout_ideal_bitstrs = bitstrs + + def load_circuits_from_file(self, filename: str, old_qubits_list: list[cirq.Qid] | None = None): + """Load the circuits from a file. + + Args: + filename: The filename from which to load the circuits. + old_qubits_list: The previous ordered list of qubits if mapping to different qubits. + """ + circuits = pickle.load(open(filename, "rb")) + if old_qubits_list is not None and old_qubits_list != self.qubits: + self.all_circuits = [ + circuit.transform_qubits(dict(zip(old_qubits_list, self.qubits))) + for circuit in tqdm(circuits, total=len(circuits)) + ] + else: + self.all_circuits = circuits + self.identify_ideal_readout_bitstrings_from_circuits() + + def shuffle_circuits_and_save(self, batch_size: int = 500, delete_circuit_list: bool = True): + """Shuffle all of the circuits and save them to files. + + Args: + batch_size: The maximum number of circuits per file. + delete_circuit_list: Whether to delete the circuit list from this object after saving, saves memory. + """ + if not os.path.isdir(self.save_directory + "/shuffled_circuits"): + os.mkdir(self.save_directory + "/shuffled_circuits") + + # shuffle the circuits: + circuits = self.all_circuits + indices = np.arange(len(circuits)) + self.rng.shuffle(indices) + self.run_order = indices + inv_map = np.array([list(indices).index(_) for _ in range(len(circuits))]) + circuits_shuffled = [circuits[_] for _ in indices] + # save the shuffled circuits + for start_idx in tqdm(range(0, len(circuits), batch_size)): + circuits_i = circuits_shuffled[start_idx : (start_idx + batch_size)] + pickle.dump( + circuits_i, + open(self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "wb"), + ) + + params = { + "indices": indices, + "inv_map": inv_map, + "readout_ideal_bitstrs": self.readout_ideal_bitstrs, + "batch_size": batch_size, + } + pickle.dump(params, open(self.save_directory + "/shuffled_circuits/params.pickle", "wb")) + + if delete_circuit_list: + del self.all_circuits + + def run_experiment( + self, + repetitions_post_selection: int | list[int] = 2000, + repetitions_non_post_selection: int | list[int] = 2000, + batch_size: int = 500, + readout_repetitions: int = 1000, + random_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + num_random_bitstrings: int = 30, + ): + """Run the experiment. First, shuffles and saves the circuits and then runs from the saved + circuits. + + Calls generate_all_circuits, then shuffle_circuits_and_save, then run_experiment_from_saved_circuits, then load_shuffled_measurements. + + Note: These default values are used for the 1D experiment. For the 2D experiment, we use + ``` + repetitions_post_selection = [ + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1136, + 1317, + 1604, + 2267, + 4139, + 5655, + 6579, + 7688, + 11550, + 17332, + 25493, + 37463, + 46275, + 63527, + 69691, + 111862, + 137194, + 227824, + 348209, + 346286, + 406434, + 500000, + 682846, + ], + repetitions_non_post_selection = 1000 + ``` + + Args: + repetitions_post_selection: How many repetitions to use for the gauge invariant initial state at each cycle number. + repetitions_non_post_selection: How many repetitions to use for the superposition initial state. + batch_size: The maximum number of circuits per file and per run_batch call. + random_compile: Whether to add randomized compiling. + dynamical_decouple: Whether to add dynamical decoupling. + dd_sequence: The dynamical decoupling sequence to use. + num_random_bitstrings: The number of bitstrings to use for readout mitigation. + readout_repetitions: The number of repetitions to use for readout benchmarking. + """ + self.generate_all_circuits( + random_compile=random_compile, + dynamical_decouple=dynamical_decouple, + dd_sequence=dd_sequence, + num_random_bitstrings=num_random_bitstrings, + ) + self.shuffle_circuits_and_save(batch_size=batch_size, delete_circuit_list=True) + self.run_experiment_from_saved_circuits( + repetitions_post_selection=repetitions_post_selection, + repetitions_non_post_selection=repetitions_non_post_selection, + initial_start_index=0, + readout_repetitions=readout_repetitions, + ) + self.load_shuffled_measurements() + self.extract_readout_error_rates() + + def run_experiment_from_saved_circuits( + self, + repetitions_post_selection: int | list[int] = 2000, + repetitions_non_post_selection: int | list[int] = 2000, + initial_start_index: int = 0, + old_qubits: list[cirq.Qid] | None = None, + new_qubits: list[cirq.Qid] | None = None, + readout_repetitions: int = 1000, + ): + """Run the experiment from circuits that are saved ahead of time. Saves the results to + files. + + To use this, the circuits should have been saved with shuffle_circuits_and_save. + + The location of the saved files is `self.save_directory + "/shuffled_results"`. + + Note: These default values are used for the 1D experiment. For the 2D experiment, we use + ``` + repetitions_post_selection = [ + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1000, + 1136, + 1317, + 1604, + 2267, + 4139, + 5655, + 6579, + 7688, + 11550, + 17332, + 25493, + 37463, + 46275, + 63527, + 69691, + 111862, + 137194, + 227824, + 348209, + 346286, + 406434, + 500000, + 682846, + ], + repetitions_non_post_selection = 1000 + ``` + + Args: + repetitions_post_selection: How many repetitions to use for the gauge invariant initial state at each cycle number. + repetitions_non_post_selection: How many repetitions to use for the superposition initial state. + initial_start_index: The circuit number to start at (inteneded for resuming datataking if it crashed before). + old_qubits: The order of qubits for which the circuits were originally generated. + new_qubits: The order of qubits to use now. + readout_repetitions: The number of repetitions to use for readout benchmarking. + """ + + # convert to lists: + repetitions_post_selection_list = list( + np.zeros(len(self.cycles), dtype=int) + repetitions_post_selection + ) + repetitions_non_post_selection_list = list( + np.zeros(len(self.cycles), dtype=int) + repetitions_non_post_selection + ) + + # load the parameters for the shuffled circuits: + params = pickle.load(open(self.save_directory + "/shuffled_circuits/params.pickle", "rb")) + indices = params["indices"] + inv_map = params["inv_map"] + self.readout_ideal_bitstrs = params["readout_ideal_bitstrs"] + batch_size = params["batch_size"] + self.run_order = indices + num_random_bitstrings = len(self.readout_ideal_bitstrs) + zero_trotter_options = [0, 1] if self.include_zero_trotter else [0] + tot_num_circuits = ( + len(self.cycles) * 2 * 2 * len(zero_trotter_options) * self.num_gc_instances + + num_random_bitstrings + ) + + repetitions = [readout_repetitions] * tot_num_circuits + + # fill in other repetition numbers + for init_state_number, reps_list in [ + (0, repetitions_post_selection_list), + (1, repetitions_non_post_selection_list), + ]: + for basis_number in [0, 1]: + for zero_trotter_number in zero_trotter_options: + for cycle_number in self.cycles: + start_index = ( + init_state_number + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + basis_number + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + zero_trotter_number * len(self.cycles) * self.num_gc_instances + + cycle_number * self.num_gc_instances + ) + end_index = start_index + self.num_gc_instances + repetitions[start_index:end_index] = [ + reps_list[cycle_number] + ] * self.num_gc_instances + + # now apply the shuffle + repetitions = [repetitions[index] for index in indices] + + # run the experiment and save shuffled measurements to files (avoids memory issues) + for start_idx in tqdm(range(initial_start_index, tot_num_circuits, batch_size)): + circuits_i = pickle.load( + open(self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "rb") + ) + if old_qubits is not None and new_qubits is not None: + print("Transforming qubits") + circuits_i = [ + circuit.transform_qubits(dict(zip(old_qubits, new_qubits))) + for circuit in circuits_i + ] + print(f"Shots this batch: {sum(repetitions[start_idx : (start_idx + batch_size)])}") + result = self.sampler.run_batch( + circuits_i, repetitions=repetitions[start_idx : (start_idx + batch_size)] + ) + results_i = [res[0].measurements["m"].astype(bool) for res in result] + if not os.path.isdir(self.save_directory + f"/shuffled_results"): + os.mkdir(self.save_directory + f"/shuffled_results") + pickle.dump( + results_i, open(self.save_directory + f"/shuffled_results/{start_idx}.pickle", "wb") + ) + + def load_shuffled_measurements(self): + """Loads the measurement results from files. + + First, you should have run `run_experiment_from_saved_circuits` or `run_experiment`. + """ + params = pickle.load(open(self.save_directory + "/shuffled_circuits/params.pickle", "rb")) + indices = params["indices"] + inv_map = params["inv_map"] + self.readout_ideal_bitstrs = params["readout_ideal_bitstrs"] + batch_size = params["batch_size"] + num_random_bitstrings = len(self.readout_ideal_bitstrs) + zero_trotter_options = [False, True] if self.include_zero_trotter else [False] + tot_num_circuits = ( + len(self.cycles) * 2 * 2 * len(zero_trotter_options) * self.num_gc_instances + + num_random_bitstrings + ) + measurements_shuffled = [] + for start_idx in tqdm(range(0, tot_num_circuits, batch_size)): + measurements_shuffled += pickle.load( + open(self.save_directory + f"/shuffled_results/{start_idx}.pickle", "rb") + ) + + self.measurements = [measurements_shuffled[i] for i in inv_map] + + def extract_readout_error_rates(self) -> None: + ideal_bitstrs = self.readout_ideal_bitstrs + num_bitstrs = len(ideal_bitstrs) + readout_measurements = np.array(self.measurements[-num_bitstrs:]) + repetitions = len(readout_measurements[0]) + num_prep_0 = np.sum(1 - ideal_bitstrs, axis=0) + num_prep_1 = np.sum(ideal_bitstrs, axis=0) + self.e0 = np.einsum("ik,ijk->k", (1 - ideal_bitstrs), readout_measurements) / ( + num_prep_0 * repetitions + ) + self.e1 = np.einsum("ik,ijk->k", ideal_bitstrs, (1 - readout_measurements)) / ( + num_prep_1 * repetitions + ) + + return None + + def extract_measurements( + self, + basis_number: int, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + post_select: bool = False, + ) -> np.ndarray: + """Extract the portion of the measurement results corresponding to the requested data. + + Args: + basis_number: 0 for z-basis and 1 for x-basis. + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + + Returns: + The measurement results. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + if initial_state == "single_sector": + initial_state = "gauge_invariant" + init_state_number = ["gauge_invariant", "superposition"].index(initial_state) + zero_trotter_options = [False, True] if self.include_zero_trotter else [False] + zero_trotter_number = zero_trotter_options.index(zero_trotter) + + start_index = ( + init_state_number + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + + basis_number * len(zero_trotter_options) * len(self.cycles) * self.num_gc_instances + + zero_trotter_number * len(self.cycles) * self.num_gc_instances + + cycle_number * self.num_gc_instances + ) + + measurements = np.array( + self.measurements[start_index : (start_index + self.num_gc_instances)] + ) + repetitions = len(measurements[0]) + measurements = measurements.reshape(repetitions * self.num_gc_instances, len(self.qubits)) + if post_select: + mask = np.all(measurements[:, self.matter_sites] == False, axis=1) + measurements = measurements[mask, :] + return measurements + + def extract_zzz_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the interaction term (ZZZ in the LGT basis but implemented + here as Z_gauge in the dual basis). + + Args: + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 0, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + raise NotImplementedError + else: + p1 = np.mean(measurements[:, self.gauge_sites], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.gauge_sites] + e1 = self.e1[self.gauge_sites] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_interaction( + self, + initial_state: str, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ): + """Get the expectation value of the interaction term (ZZZ in the LGT basis but implemented + here as Z_gauge in the dual basis) at all cycles. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [cycle_number, value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + return np.array([ + self.extract_zzz_single_cycle( + initial_state, + cycle, + zero_trotter, + readout_mitigate, + post_select, + tolerated_distance_to_error, + ) + for cycle in tqdm(self.cycles, total=len(self.cycles)) + ]) + + def extract_gauge_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the h term, which is X_gauge in both the LGT and dual bases. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + raise NotImplementedError + else: + p1 = np.mean(measurements[:, self.gauge_sites], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.gauge_sites] + e1 = self.e1[self.gauge_sites] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_gauge_x( + self, + initial_state: str, + readout_mitigate: bool = True, + post_select: bool = False, + zero_trotter: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the h term, which is X_gauge in both the LGT and dual bases + at all cycle numbers. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [cycle_number, value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + return np.array([ + self.extract_gauge_x_single_cycle( + initial_state, + cycle, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + tolerated_distance_to_error=tolerated_distance_to_error, + ) + for cycle in self.cycles + ]) + + def extract_matter_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the mu term. + + This is X_matter in the LGT basis and a product of Xs on a matter site and the neighboring gauge sites in the dual basis. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + x = [] + dx = [] + e0 = deepcopy(self.e0) + e1 = deepcopy(self.e1) + if post_select and readout_mitigate: + for idx in self.matter_sites: + e0[idx] = 0.0 + e1[idx] = 0.0 + for q_idx in self.matter_sites: + qubits_i = [ + self.qubits[q_idx - 1], + self.qubits[q_idx], + self.qubits[(q_idx + 1) % len(self.qubits)], + ] + indices = np.array([self.qubits.index(cast(cirq.GridQubit, qi)) for qi in qubits_i]) + if post_select and tolerated_distance_to_error < np.inf: + raise NotImplementedError + else: + measurements_i = measurements[:, indices] + if readout_mitigate: + single_qubit_cmats = [] + qubits_to_mitigate = [] + + for i in indices: + e0_sq = e0[i] + e1_sq = e1[i] + single_qubit_cmats.append(np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]])) + qubits_to_mitigate.append(self.qubits[i]) + + tcm = cirq.experiments.TensoredConfusionMatrices(single_qubit_cmats, [[q] for q in qubits_to_mitigate], repetitions=measurements_i.shape[0], timestamp = 0.0) + + x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated(qubits_to_mitigate, measurements_i) + else: + repetitions = measurements_i.shape[0] + p1 = np.mean(np.sum(measurements_i, axis=1) % 2) + dp1 = np.sqrt(p1 * (1 - p1) / (repetitions * self.num_gc_instances)) + x_i = 1 - 2 * p1 + dx_i = 2 * dp1 + x.append(x_i) + dx.append(dx_i) + return np.array([x, dx]) + + def extract_matter_x( + self, + initial_state: str, + readout_mitigate: bool = True, + post_select: bool = False, + zero_trotter: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the mu term at all cycles. + + This is X_matter in the LGT basis and a product of Xs on a matter site and the neighboring gauge sites in the dual basis. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [cycle_number, value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + return np.array([ + self.extract_matter_x_single_cycle( + initial_state, + cycle, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + tolerated_distance_to_error=tolerated_distance_to_error, + ) + for cycle in self.cycles + ]) + + +class DFLExperiment2D(DFLExperiment): + """A class for performing the 2D DFL experiment (Fig 3 of the paper). + + Attrs: + sampler: The cirq sampler to use. + qubits: The grid of qubits. + origin_qubit: One of the matter qubits. + h: The coefficient on the gauge X term. + mu: The coefficient on the matter sigma_x term. + tau: The Trotter step size. + rng: The pseudorandom number generator to use for readout benchmarking. + cycles: The number of cycles for which to run the experiment. + num_gc_instances: The number of randomized compiling instances to use. + excited_qubits: Which qubits to excite. + use_cphase: Whether to use cphase gates. + include_zero_trotter: Whether to include circuits with the Trotter step set to 0. + """ + + def __init__( + self, + sampler: cirq.Sampler, + qubits: list[cirq.GridQubit], + origin_qubit: cirq.GridQubit, + save_directory: str, + h: float = 1.5, + mu: float = 3.5, + tau: float = 0.35, + rng: np.random.Generator = np.random.default_rng(), + cycles: np.ndarray = np.arange(31), + num_gc_instances: int = 40, + excited_qubits: list[cirq.GridQubit] = [cirq.GridQubit(0, 7)], + use_cphase: bool = True, + include_zero_trotter: bool = True, + ): + self.sampler = sampler + self.h = h + self.mu = mu + self.tau = tau + self.rng = rng + self.cycles = cycles + self.all_circuits: list[cirq.Circuit] = [] + self.e0 = np.array([]) + self.e1 = np.array([]) + self.num_gc_instances = num_gc_instances + self.executor = process_pool.get_executor() + self.readout_ideal_bitstrs = np.array([]) + self.save_directory = save_directory + self.lgtdfl = dfl_2d.LGTDFL(qubits, origin_qubit, tau, h, mu) + self.lgtdfl_zero_trotter = dfl_2d.LGTDFL(qubits, origin_qubit, 0.0, h, mu) + self.qubits = self.lgtdfl.all_qubits + self.excited_qubits = excited_qubits + self.use_cphase = use_cphase + self.origin_qubit = origin_qubit + self.include_zero_trotter = include_zero_trotter + self.matter_sites = np.array(self.lgtdfl._matter_indices()) + self.gauge_sites = np.array(self.lgtdfl._gauge_indices()) + + def save_to_file(self, filename: str): + """Save a dictionary containing parameters and measurements. + + Args: + filename: The filename to save to. + """ + + d_to_save = { + "measurements": self.measurements, + "readout_ideal_bitstrs": self.readout_ideal_bitstrs, + "h": self.h, + "mu": self.mu, + "tau": self.tau, + "cycles": self.cycles, + "num_gc_instances": self.num_gc_instances, + "e0": self.e0, + "e1": self.e1, + "qubits": self.qubits, + "save_directory": self.save_directory, + "excited_qubits": self.excited_qubits, + "use_cphase": self.use_cphase, + "origin_qubit": self.origin_qubit, + "include_zero_trotter": self.include_zero_trotter, + } + + pickle.dump(d_to_save, open(filename, "wb")) + + def load_from_file(self, filename: str): + """Load parameters and measurements from a dictionary. + + Args: + filename: The filename to load from. + """ + + d = pickle.load(open(filename, "rb")) + self.measurements = d["measurements"] + self.readout_ideal_bitstrs = d["readout_ideal_bitstrs"] + self.h = d["h"] + self.mu = d["mu"] + self.tau = d["tau"] + self.cycles = d["cycles"] + self.num_gc_instances = d["num_gc_instances"] + self.e0 = d["e0"] + self.e1 = d["e1"] + self.qubits = d["qubits"] + self.save_directory = d.get("save_directory", "temp") + self.excited_qubits = d["excited_qubits"] + self.use_cphase = d["use_cphase"] + self.origin_qubit = d["origin_qubit"] + self.include_zero_trotter = d.get("include_zero_trotter", True) + + def generate_circuit_instances( + self, + initial_state: str, + ncycles: int, + basis: str, + random_compile: bool = True, + dynamical_decouple: bool = True, + dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), + zero_trotter: bool = False, + qubits_to_fix_disorder: list[cirq.Qid] = [], + disorder_pattern: list[bool] = [], + ) -> list[cirq.Circuit]: + """Generate the circuit instances for a given initial state, number of cycles, and + measurement basis. + + Args: + initial_state: Which initial state, either "gauge_invariant" or "superposition". + ncycles: The number of Trotter steps (can be 0). + basis: The basis in which to measure. Either "x" or "z". + random_compile: Whether to apply randomized compiling. + dynamical_decouple: Whether to apply dynamical decoupling. + dd_sequence: The dynamical decoupling sequence to use. + zero_trotter: Whether to set the trotter step size to 0 (used to calibrate error mitigation). + qubits_to_fix_disorder: Qubits on which to fix a disorder pattern for the superposition initial state. + disorder_pattern: The disorder pattern to fix. + + Returns: + A list of the circuit instances. + + Raises: + ValueError: If initial_state, ncycles, or basis is not valid. + """ + + for q in qubits_to_fix_disorder: + assert q in self.lgtdfl.matter_qubits, ( + "qubits_to_fix_disorder must all be matter qubits" + ) + + if initial_state not in ["gauge_invariant", "single_sector", "superposition"]: + raise ValueError( + "initial_state should be 'gauge_invariant' or 'superposition' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if ncycles < 0: + raise ValueError("ncycles must be nonnegative") + if basis not in ["z", "x"]: + raise ValueError("basis must be 'z' or 'x'") + + basis_index = ["z", "x"].index(basis) + if initial_state == "gauge_invariant": + initial_state = "single_sector" + if zero_trotter: + lgtdfl = self.lgtdfl_zero_trotter + else: + lgtdfl = self.lgtdfl + circuit = lgtdfl.get_2d_dfl_experiment_circuits( + initial_state, + [ncycles], + excited_qubits=self.excited_qubits, + n_instances=1, + two_qubit_gate="cphase_simultaneous" if self.use_cphase else "cz_simultaneous", + )[basis_index] + new_moment_0 = cirq.Moment( + list(circuit[0].operations) + + [ + cirq.Ry(rads=(1 - 2 * disorder) * np.pi / 2)(q) + for q, disorder in zip(qubits_to_fix_disorder, disorder_pattern) + ] + ) + circuit = cirq.Circuit(new_moment_0) + circuit[1:] + + if random_compile: + circuits = list( + self.executor.map( + partial(_apply_gauge_compiling, circuit=circuit), + range(self.num_gc_instances), + ) + ) + else: + circuits = [circuit] + + if dynamical_decouple: + circuits_dd = list( + self.executor.map( + partial(cirq.add_dynamical_decoupling, schema=dd_sequence), circuits + ) + ) + circuits = circuits_dd + + return circuits + + def post_select_measurements_on_distance( + self, + measurements: np.ndarray, + tolerated_distance_to_error: int | float, + operator_qubits: list[cirq.GridQubit], + ) -> np.ndarray: + """Return the subset of measurements where the errors occur at least a given distance from + the operator qubits. + + Args: + measurements: The non-post-selected measurements. + tolerated_distance_to_error: Allow errors greater than this distance from operator_qubits. + operator_qubits: The qubits on which we are measuring an operator. + + Returns: + The post-selected measurements. + """ + matter_qubits = self.lgtdfl.matter_qubits + matter_indices = np.array(self.lgtdfl._matter_indices()) + distances = np.array([ + min(_distance(q_operator, qubit) for q_operator in operator_qubits) + for qubit in matter_qubits + ]) + errors = measurements[:, matter_indices] + distance_to_error = np.min(np.where(errors, distances, np.inf), axis=1) + return measurements[distance_to_error >= tolerated_distance_to_error] + + def extract_zzz_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the interaction term (ZZZ in the LGT basis but implemented + here as Z_gauge in the dual basis). + + Args: + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 0, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + p1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + dp1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + for idx, (op_qubit, op_index) in enumerate( + zip(self.lgtdfl.gauge_qubits, self.lgtdfl._gauge_indices()) + ): + measurements_q = self.post_select_measurements_on_distance( + measurements, tolerated_distance_to_error, [op_qubit] + )[:, op_index] + print( + f"Interaction term, cycle {cycle_number}, qubit {op_qubit}, {len(measurements_q)} counts of {len(measurements)}" + ) + if len(measurements_q) == 0: + p1[idx] = np.nan + dp1[idx] = np.nan + else: + p1[idx] = np.mean(measurements_q) + dp1[idx] = np.sqrt(p1[idx] * (1 - p1[idx]) / len(measurements_q)) + else: + p1 = np.mean(measurements[:, self.lgtdfl._gauge_indices()], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.lgtdfl._gauge_indices()] + e1 = self.e1[self.lgtdfl._gauge_indices()] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_gauge_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the h term, which is X_gauge in both the LGT and dual bases. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + if post_select and tolerated_distance_to_error < np.inf: + p1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + dp1 = np.zeros(len(self.lgtdfl.gauge_qubits)) + for idx, (op_qubit, op_index) in enumerate( + zip(self.lgtdfl.gauge_qubits, self.lgtdfl._gauge_indices()) + ): + measurements_q = self.post_select_measurements_on_distance( + measurements, tolerated_distance_to_error, [op_qubit] + )[:, op_index] + print( + f"Gauge X term, cycle {cycle_number}, qubit {op_qubit}, {len(measurements_q)} counts of {len(measurements)}" + ) + if len(measurements_q) == 0: + p1[idx] = np.nan + dp1[idx] = np.nan + else: + p1[idx] = np.mean(measurements_q) + dp1[idx] = np.sqrt(p1[idx] * (1 - p1[idx]) / len(measurements_q)) + else: + p1 = np.mean(measurements[:, self.lgtdfl._gauge_indices()], axis=0) + dp1 = np.sqrt(p1 * (1 - p1) / len(measurements)) + if not readout_mitigate: + x_dx = np.array([1 - 2 * p1, 2 * dp1]) # value, uncertainty + else: + x_raw = 1 - 2 * p1 + e0 = self.e0[self.lgtdfl._gauge_indices()] + e1 = self.e1[self.lgtdfl._gauge_indices()] + x_mitigated = (x_raw + e0 - e1) / ( + 1.0 - e0 - e1 + ) # see Eq. F3 of https://arxiv.org/pdf/2106.01264 + x_dx = np.array([x_mitigated, 2 * dp1 / (1.0 - e0 - e1)]) + return x_dx + + def extract_matter_x_single_cycle( + self, + initial_state: str, + cycle_number: int, + zero_trotter: bool = False, + readout_mitigate: bool = True, + post_select: bool = False, + tolerated_distance_to_error: int | float = np.inf, + ) -> np.ndarray: + """Get the expectation value of the mu term. + + This is X_matter in the LGT basis and a product of Xs on a matter site and the neighboring gauge sites in the dual basis. + + Args: + initial_state: Either "superposition" or "gauge_invariant" + cycle_number: The number of Trotter steps (can be 0). + zero_trotter: Whether to set the time step to 0 (for calibrating error mitigation). + readout_mitigate: Whether to use readout error mitigation. + post_select: Whether to post select on the gauge charges (intended for the gauge_invariant initial state only). + tolerated_distance_to_error: Allow errors greater than this distance. + + Returns: + The expectation value and statistical uncertainty of the interaction term. Shape is [value/uncertainty, site_number]. + + Raises: + ValueError: If the input arguments are not allowed. + """ + if initial_state not in ["single_sector", "gauge_invariant", "superposition"]: + raise ValueError( + "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" + ) + if cycle_number < 0: + raise ValueError("Cycle number should be nonnegative") + if initial_state == "superposition" and post_select: + raise ValueError("Post selection is intended for the gauge invariant initial state.") + + if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: + self.extract_readout_error_rates() + + measurements = self.extract_measurements( + 1, + initial_state, + cycle_number, + zero_trotter, + post_select and tolerated_distance_to_error == np.inf, + ) + x = [] + dx = [] + e0 = deepcopy(self.e0) + e1 = deepcopy(self.e1) + if post_select and readout_mitigate: + for idx in self.lgtdfl._matter_indices(): + e0[idx] = 0.0 + e1[idx] = 0.0 + for q in self.lgtdfl.matter_qubits: + qubits_i = [q_n for q_n in q.neighbors() if q_n in self.lgtdfl.all_qubits] + [q] + indices = np.array([ + self.lgtdfl.all_qubits.index(cast(cirq.GridQubit, qi)) for qi in qubits_i + ]) + if post_select and tolerated_distance_to_error < np.inf: + measurements_i = self.post_select_measurements_on_distance( + measurements, tolerated_distance_to_error, cast(list[cirq.GridQubit], qubits_i) + )[:, indices] + print( + f"Matter X term, cycle {cycle_number}, qubit {q}, {len(measurements_i)} counts of {len(measurements)}" + ) + else: + measurements_i = measurements[:, indices] + if readout_mitigate: + single_qubit_cmats = [] + qubits_to_mitigate = [] + + for i in indices: + e0_sq = e0[i] + e1_sq = e1[i] + single_qubit_cmats.append(np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]])) + qubits_to_mitigate.append(self.qubits[i]) + + tcm = cirq.experiments.TensoredConfusionMatrices(single_qubit_cmats, + [[q] for q in qubits_to_mitigate], + repetitions=measurements_i.shape[ + 0], timestamp=0.0) + + x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated(qubits_to_mitigate, + measurements_i) + else: + repetitions = measurements_i.shape[0] + p1 = np.mean(np.sum(measurements_i, axis=1) % 2) + dp1 = np.sqrt(p1 * (1 - p1) / (repetitions * self.num_gc_instances)) + x_i = 1 - 2 * p1 + dx_i = 2 * dp1 + x.append(x_i) + dx.append(dx_i) + return np.array([x, dx]) From 3e6e6f2d9b552c9c1a61021d24f7643c34370441 Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 02:50:33 +0000 Subject: [PATCH 2/8] black --- recirq/dfl/dfl_1d.py | 23 +- recirq/dfl/dfl_2d_second_order_trotter.py | 40 +++- recirq/dfl/dfl_experiment.py | 278 +++++++++++++++------- 3 files changed, 235 insertions(+), 106 deletions(-) diff --git a/recirq/dfl/dfl_1d.py b/recirq/dfl/dfl_1d.py index 1482718f..5e0874f3 100644 --- a/recirq/dfl/dfl_1d.py +++ b/recirq/dfl/dfl_1d.py @@ -184,7 +184,10 @@ def _change_basis(grid: Sequence[cirq.GridQubit]) -> cirq.Circuit: moment_2.append(cirq.CZ(q3, q2)) moment_h.append(cirq.H(q2)) return cirq.Circuit.from_moments( - cirq.Moment(moment_h), cirq.Moment(moment_1), cirq.Moment(moment_2), cirq.Moment(moment_h) + cirq.Moment(moment_h), + cirq.Moment(moment_1), + cirq.Moment(moment_2), + cirq.Moment(moment_h), ) @@ -225,15 +228,21 @@ def get_1d_dfl_experiment_circuits( basis="dual", ): if initial_state == "single_sector": - initial_circuit = _energy_bump_initial_state(grid, h, "single_sector", excited_qubits) + initial_circuit = _energy_bump_initial_state( + grid, h, "single_sector", excited_qubits + ) elif initial_state == "superposition": - initial_circuit = _energy_bump_initial_state(grid, h, "superposition", excited_qubits) + initial_circuit = _energy_bump_initial_state( + grid, h, "superposition", excited_qubits + ) else: raise ValueError("Invalid initial state") circuits = [] for n_cycle in tqdm(n_cycles): print(int(np.max([0, n_cycle - 1]))) - circ = initial_circuit + trotter_circuit(grid, n_cycle, dt, h, mu, two_qubit_gate) + circ = initial_circuit + trotter_circuit( + grid, n_cycle, dt, h, mu, two_qubit_gate + ) if basis == "lgt": circ += _change_basis(grid) elif basis == "dual": @@ -252,7 +261,11 @@ def get_1d_dfl_experiment_circuits( ) else: raise ValueError("Invalid option for basis") - circ_x = circ + _layer_hadamard(grid, "all") + cirq.measure([q for q in grid], key="m") + circ_x = ( + circ + + _layer_hadamard(grid, "all") + + cirq.measure([q for q in grid], key="m") + ) circuits.append(circ_z) circuits.append(circ_x) return circuits diff --git a/recirq/dfl/dfl_2d_second_order_trotter.py b/recirq/dfl/dfl_2d_second_order_trotter.py index 0239db70..38ec4431 100644 --- a/recirq/dfl/dfl_2d_second_order_trotter.py +++ b/recirq/dfl/dfl_2d_second_order_trotter.py @@ -93,7 +93,9 @@ def get_x_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: if q in self.gauge_qubits ] - def create_lgt_grid(self) -> Tuple[Tuple[cirq.GridQubit, ...], Tuple[cirq.GridQubit, ...]]: + def create_lgt_grid( + self, + ) -> Tuple[Tuple[cirq.GridQubit, ...], Tuple[cirq.GridQubit, ...]]: """Creates a lattice gauge theory (LGT) grid with simplified logic. Args: @@ -120,7 +122,8 @@ def create_lgt_grid(self) -> Tuple[Tuple[cirq.GridQubit, ...], Tuple[cirq.GridQu # Find matter neighbors for gauge qubits if neighbor.row == current_matter.row: # Horizontal gauge horizontal_neighbor = cirq.GridQubit( - neighbor.row, neighbor.col + (neighbor.col - current_matter.col) + neighbor.row, + neighbor.col + (neighbor.col - current_matter.col), ) if ( horizontal_neighbor in self.qubit_grid @@ -130,7 +133,8 @@ def create_lgt_grid(self) -> Tuple[Tuple[cirq.GridQubit, ...], Tuple[cirq.GridQu queue.append(horizontal_neighbor) else: # Vertical gauge vertical_neighbor = cirq.GridQubit( - neighbor.row + (neighbor.row - current_matter.row), neighbor.col + neighbor.row + (neighbor.row - current_matter.row), + neighbor.col, ) if ( vertical_neighbor in self.qubit_grid @@ -194,7 +198,9 @@ def trotter_circuit(self, n_cycles, two_qubit_gate="cz_simultaneous"): + self._layer_floquet_cphase_simultaneous_last_missing_piece() ) - def layer_floquet(self, two_qubit_gate="cz_simultaneous", layer="middle") -> cirq.Circuit: + def layer_floquet( + self, two_qubit_gate="cz_simultaneous", layer="middle" + ) -> cirq.Circuit: if two_qubit_gate == "cz_simultaneous": return self._layer_floquet_cz_simultaneous() @@ -477,7 +483,9 @@ def _compute_observables_one_instance_dual_basis( prod_x = bits_x_rescaled[:, self.all_qubits.index(q)] q_nbrs = [q_n for q_n in q.neighbors() if q_n in self.all_qubits] for q_n in q_nbrs: - prod_x *= bits_x_rescaled[:, self.all_qubits.index(cast(cirq.GridQubit, q_n))] + prod_x *= bits_x_rescaled[ + :, self.all_qubits.index(cast(cirq.GridQubit, q_n)) + ] matter_x_terms.append(prod_x) exp_matter_x = np.mean(matter_x_terms, axis=1) @@ -499,12 +507,14 @@ def _compute_observables_one_instance_dual_basis( em = ( exp_interaction[gauge_index] + self.h * exp_gauge_x[gauge_index] - + self.mu * (w0 * exp_matter_x[mat_1_index] + w1 * exp_matter_x[mat_2_index]) + + self.mu + * (w0 * exp_matter_x[mat_1_index] + w1 * exp_matter_x[mat_2_index]) ) ev = ( var_interaction[gauge_index] + self.h * var_gauge_x[gauge_index] - + self.mu * (w0 * var_matter_x[mat_1_index] + w1 * var_matter_x[mat_2_index]) + + self.mu + * (w0 * var_matter_x[mat_1_index] + w1 * var_matter_x[mat_2_index]) ) exp_energy.append(em) var_energy.append(ev) @@ -517,11 +527,15 @@ def _compute_observables_one_instance_dual_basis( ] def compute_observables_dual_basis( - self, bits_z: Sequence[npt.NDArray[np.int8]], bits_x: Sequence[npt.NDArray[np.int8]] + self, + bits_z: Sequence[npt.NDArray[np.int8]], + bits_x: Sequence[npt.NDArray[np.int8]], ) -> Sequence[np.ndarray]: num_instances = len(bits_z) if num_instances == 1: - return self._compute_observables_one_instance_dual_basis(bits_z[0], bits_x[0]) + return self._compute_observables_one_instance_dual_basis( + bits_z[0], bits_x[0] + ) else: # compute mean and variance of the mean over randomized instances matter_x = [] @@ -599,9 +613,13 @@ def get_2d_dfl_experiment_circuits( basis="dual", ): if initial_state == "single_sector": - initial_circuit = self._energy_bump_initial_state("single_sector", excited_qubits) + initial_circuit = self._energy_bump_initial_state( + "single_sector", excited_qubits + ) elif initial_state == "superposition": - initial_circuit = self._energy_bump_initial_state("superposition", excited_qubits) + initial_circuit = self._energy_bump_initial_state( + "superposition", excited_qubits + ) else: raise ValueError("Invalid initial state") circuits = [] diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py index ded5c034..c3405fa2 100644 --- a/recirq/dfl/dfl_experiment.py +++ b/recirq/dfl/dfl_experiment.py @@ -16,7 +16,9 @@ def _apply_gauge_compiling(seed: int, circuit: cirq.Circuit) -> cirq.Circuit: return cirq.merge_single_qubit_moments_to_phxz( - cirq.transformers.gauge_compiling.CPhaseGaugeTransformerMM()(circuit, rng_or_seed=seed) + cirq.transformers.gauge_compiling.CPhaseGaugeTransformerMM()( + circuit, rng_or_seed=seed + ) ) @@ -157,7 +159,10 @@ def generate_circuit_instances( if random_compile: circuits = list( - self.executor.map(partial(_apply_gauge_compiling, circuit=circuit), range(self.num_gc_instances)) + self.executor.map( + partial(_apply_gauge_compiling, circuit=circuit), + range(self.num_gc_instances), + ) ) else: circuits = [circuit] @@ -179,7 +184,10 @@ def create_readout_benchmark_circuits( random_circuits = [] random_circuits = [ cirq.Circuit( - [x_or_I(bit)(qubit) for bit, qubit in zip(bitstr, self.qubits[: len(bitstr)])] + [ + x_or_I(bit)(qubit) + for bit, qubit in zip(bitstr, self.qubits[: len(bitstr)]) + ] + [cirq.M(*self.qubits[: len(bitstr)], key="m")] ) for bitstr in bitstrs @@ -193,10 +201,12 @@ def identify_ideal_readout_bitstrings_from_circuits(self): break bit_fn = lambda gate: 1 if gate == cirq.X else 0 - self.readout_ideal_bitstrs = np.array([ - [bit_fn(circuit[0][qubit].gate) for qubit in self.qubits] - for circuit in self.all_circuits[-num_bitstrs:] - ]) + self.readout_ideal_bitstrs = np.array( + [ + [bit_fn(circuit[0][qubit].gate) for qubit in self.qubits] + for circuit in self.all_circuits[-num_bitstrs:] + ] + ) def generate_all_circuits( self, @@ -213,7 +223,11 @@ def generate_all_circuits( all_circuits = [] with tqdm( - total=2 * 2 * len(zero_trotter_options) * len(self.cycles) * num_gc_instances + total=2 + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * num_gc_instances ) as pbar: for initial_state in ["gauge_invariant", "superposition"]: for basis in ["z", "x"]: @@ -230,7 +244,9 @@ def generate_all_circuits( ) pbar.update(num_gc_instances) - bitstrs, readout_circuits = self.create_readout_benchmark_circuits(num_random_bitstrings) + bitstrs, readout_circuits = self.create_readout_benchmark_circuits( + num_random_bitstrings + ) all_circuits += readout_circuits expected_number = ( @@ -242,7 +258,9 @@ def generate_all_circuits( self.all_circuits = all_circuits self.readout_ideal_bitstrs = bitstrs - def load_circuits_from_file(self, filename: str, old_qubits_list: list[cirq.Qid] | None = None): + def load_circuits_from_file( + self, filename: str, old_qubits_list: list[cirq.Qid] | None = None + ): """Load the circuits from a file. Args: @@ -259,7 +277,9 @@ def load_circuits_from_file(self, filename: str, old_qubits_list: list[cirq.Qid] self.all_circuits = circuits self.identify_ideal_readout_bitstrings_from_circuits() - def shuffle_circuits_and_save(self, batch_size: int = 500, delete_circuit_list: bool = True): + def shuffle_circuits_and_save( + self, batch_size: int = 500, delete_circuit_list: bool = True + ): """Shuffle all of the circuits and save them to files. Args: @@ -281,7 +301,9 @@ def shuffle_circuits_and_save(self, batch_size: int = 500, delete_circuit_list: circuits_i = circuits_shuffled[start_idx : (start_idx + batch_size)] pickle.dump( circuits_i, - open(self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "wb"), + open( + self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "wb" + ), ) params = { @@ -290,7 +312,9 @@ def shuffle_circuits_and_save(self, batch_size: int = 500, delete_circuit_list: "readout_ideal_bitstrs": self.readout_ideal_bitstrs, "batch_size": batch_size, } - pickle.dump(params, open(self.save_directory + "/shuffled_circuits/params.pickle", "wb")) + pickle.dump( + params, open(self.save_directory + "/shuffled_circuits/params.pickle", "wb") + ) if delete_circuit_list: del self.all_circuits @@ -447,7 +471,9 @@ def run_experiment_from_saved_circuits( ) # load the parameters for the shuffled circuits: - params = pickle.load(open(self.save_directory + "/shuffled_circuits/params.pickle", "rb")) + params = pickle.load( + open(self.save_directory + "/shuffled_circuits/params.pickle", "rb") + ) indices = params["indices"] inv_map = params["inv_map"] self.readout_ideal_bitstrs = params["readout_ideal_bitstrs"] @@ -480,7 +506,9 @@ def run_experiment_from_saved_circuits( * len(zero_trotter_options) * len(self.cycles) * self.num_gc_instances - + zero_trotter_number * len(self.cycles) * self.num_gc_instances + + zero_trotter_number + * len(self.cycles) + * self.num_gc_instances + cycle_number * self.num_gc_instances ) end_index = start_index + self.num_gc_instances @@ -494,7 +522,9 @@ def run_experiment_from_saved_circuits( # run the experiment and save shuffled measurements to files (avoids memory issues) for start_idx in tqdm(range(initial_start_index, tot_num_circuits, batch_size)): circuits_i = pickle.load( - open(self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "rb") + open( + self.save_directory + f"/shuffled_circuits/{start_idx}.pickle", "rb" + ) ) if old_qubits is not None and new_qubits is not None: print("Transforming qubits") @@ -502,15 +532,21 @@ def run_experiment_from_saved_circuits( circuit.transform_qubits(dict(zip(old_qubits, new_qubits))) for circuit in circuits_i ] - print(f"Shots this batch: {sum(repetitions[start_idx : (start_idx + batch_size)])}") + print( + f"Shots this batch: {sum(repetitions[start_idx : (start_idx + batch_size)])}" + ) result = self.sampler.run_batch( - circuits_i, repetitions=repetitions[start_idx : (start_idx + batch_size)] + circuits_i, + repetitions=repetitions[start_idx : (start_idx + batch_size)], ) results_i = [res[0].measurements["m"].astype(bool) for res in result] if not os.path.isdir(self.save_directory + f"/shuffled_results"): os.mkdir(self.save_directory + f"/shuffled_results") pickle.dump( - results_i, open(self.save_directory + f"/shuffled_results/{start_idx}.pickle", "wb") + results_i, + open( + self.save_directory + f"/shuffled_results/{start_idx}.pickle", "wb" + ), ) def load_shuffled_measurements(self): @@ -518,7 +554,9 @@ def load_shuffled_measurements(self): First, you should have run `run_experiment_from_saved_circuits` or `run_experiment`. """ - params = pickle.load(open(self.save_directory + "/shuffled_circuits/params.pickle", "rb")) + params = pickle.load( + open(self.save_directory + "/shuffled_circuits/params.pickle", "rb") + ) indices = params["indices"] inv_map = params["inv_map"] self.readout_ideal_bitstrs = params["readout_ideal_bitstrs"] @@ -532,7 +570,9 @@ def load_shuffled_measurements(self): measurements_shuffled = [] for start_idx in tqdm(range(0, tot_num_circuits, batch_size)): measurements_shuffled += pickle.load( - open(self.save_directory + f"/shuffled_results/{start_idx}.pickle", "rb") + open( + self.save_directory + f"/shuffled_results/{start_idx}.pickle", "rb" + ) ) self.measurements = [measurements_shuffled[i] for i in inv_map] @@ -583,7 +623,9 @@ def extract_measurements( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if initial_state == "single_sector": initial_state = "gauge_invariant" init_state_number = ["gauge_invariant", "superposition"].index(initial_state) @@ -596,7 +638,10 @@ def extract_measurements( * len(zero_trotter_options) * len(self.cycles) * self.num_gc_instances - + basis_number * len(zero_trotter_options) * len(self.cycles) * self.num_gc_instances + + basis_number + * len(zero_trotter_options) + * len(self.cycles) + * self.num_gc_instances + zero_trotter_number * len(self.cycles) * self.num_gc_instances + cycle_number * self.num_gc_instances ) @@ -605,7 +650,9 @@ def extract_measurements( self.measurements[start_index : (start_index + self.num_gc_instances)] ) repetitions = len(measurements[0]) - measurements = measurements.reshape(repetitions * self.num_gc_instances, len(self.qubits)) + measurements = measurements.reshape( + repetitions * self.num_gc_instances, len(self.qubits) + ) if post_select: mask = np.all(measurements[:, self.matter_sites] == False, axis=1) measurements = measurements[mask, :] @@ -644,7 +691,9 @@ def extract_zzz_single_cycle( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: self.extract_readout_error_rates() @@ -702,19 +751,23 @@ def extract_interaction( "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" ) if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") - - return np.array([ - self.extract_zzz_single_cycle( - initial_state, - cycle, - zero_trotter, - readout_mitigate, - post_select, - tolerated_distance_to_error, + raise ValueError( + "Post selection is intended for the gauge invariant initial state." ) - for cycle in tqdm(self.cycles, total=len(self.cycles)) - ]) + + return np.array( + [ + self.extract_zzz_single_cycle( + initial_state, + cycle, + zero_trotter, + readout_mitigate, + post_select, + tolerated_distance_to_error, + ) + for cycle in tqdm(self.cycles, total=len(self.cycles)) + ] + ) def extract_gauge_x_single_cycle( self, @@ -748,7 +801,9 @@ def extract_gauge_x_single_cycle( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: self.extract_readout_error_rates() @@ -806,19 +861,23 @@ def extract_gauge_x( "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" ) if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") - - return np.array([ - self.extract_gauge_x_single_cycle( - initial_state, - cycle, - readout_mitigate=readout_mitigate, - post_select=post_select, - zero_trotter=zero_trotter, - tolerated_distance_to_error=tolerated_distance_to_error, + raise ValueError( + "Post selection is intended for the gauge invariant initial state." ) - for cycle in self.cycles - ]) + + return np.array( + [ + self.extract_gauge_x_single_cycle( + initial_state, + cycle, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + tolerated_distance_to_error=tolerated_distance_to_error, + ) + for cycle in self.cycles + ] + ) def extract_matter_x_single_cycle( self, @@ -854,7 +913,9 @@ def extract_matter_x_single_cycle( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: self.extract_readout_error_rates() @@ -880,7 +941,9 @@ def extract_matter_x_single_cycle( self.qubits[q_idx], self.qubits[(q_idx + 1) % len(self.qubits)], ] - indices = np.array([self.qubits.index(cast(cirq.GridQubit, qi)) for qi in qubits_i]) + indices = np.array( + [self.qubits.index(cast(cirq.GridQubit, qi)) for qi in qubits_i] + ) if post_select and tolerated_distance_to_error < np.inf: raise NotImplementedError else: @@ -892,12 +955,21 @@ def extract_matter_x_single_cycle( for i in indices: e0_sq = e0[i] e1_sq = e1[i] - single_qubit_cmats.append(np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]])) + single_qubit_cmats.append( + np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]]) + ) qubits_to_mitigate.append(self.qubits[i]) - tcm = cirq.experiments.TensoredConfusionMatrices(single_qubit_cmats, [[q] for q in qubits_to_mitigate], repetitions=measurements_i.shape[0], timestamp = 0.0) + tcm = cirq.experiments.TensoredConfusionMatrices( + single_qubit_cmats, + [[q] for q in qubits_to_mitigate], + repetitions=measurements_i.shape[0], + timestamp=0.0, + ) - x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated(qubits_to_mitigate, measurements_i) + x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated( + qubits_to_mitigate, measurements_i + ) else: repetitions = measurements_i.shape[0] p1 = np.mean(np.sum(measurements_i, axis=1) % 2) @@ -938,19 +1010,23 @@ def extract_matter_x( "initial_state should be 'superposition' or 'gauge_invariant' (or 'single_sector' which is the same as 'gauge_invariant')" ) if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") - - return np.array([ - self.extract_matter_x_single_cycle( - initial_state, - cycle, - readout_mitigate=readout_mitigate, - post_select=post_select, - zero_trotter=zero_trotter, - tolerated_distance_to_error=tolerated_distance_to_error, + raise ValueError( + "Post selection is intended for the gauge invariant initial state." ) - for cycle in self.cycles - ]) + + return np.array( + [ + self.extract_matter_x_single_cycle( + initial_state, + cycle, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + tolerated_distance_to_error=tolerated_distance_to_error, + ) + for cycle in self.cycles + ] + ) class DFLExperiment2D(DFLExperiment): @@ -1095,9 +1171,9 @@ def generate_circuit_instances( """ for q in qubits_to_fix_disorder: - assert q in self.lgtdfl.matter_qubits, ( - "qubits_to_fix_disorder must all be matter qubits" - ) + assert ( + q in self.lgtdfl.matter_qubits + ), "qubits_to_fix_disorder must all be matter qubits" if initial_state not in ["gauge_invariant", "single_sector", "superposition"]: raise ValueError( @@ -1120,7 +1196,9 @@ def generate_circuit_instances( [ncycles], excited_qubits=self.excited_qubits, n_instances=1, - two_qubit_gate="cphase_simultaneous" if self.use_cphase else "cz_simultaneous", + two_qubit_gate=( + "cphase_simultaneous" if self.use_cphase else "cz_simultaneous" + ), )[basis_index] new_moment_0 = cirq.Moment( list(circuit[0].operations) @@ -1170,10 +1248,12 @@ def post_select_measurements_on_distance( """ matter_qubits = self.lgtdfl.matter_qubits matter_indices = np.array(self.lgtdfl._matter_indices()) - distances = np.array([ - min(_distance(q_operator, qubit) for q_operator in operator_qubits) - for qubit in matter_qubits - ]) + distances = np.array( + [ + min(_distance(q_operator, qubit) for q_operator in operator_qubits) + for qubit in matter_qubits + ] + ) errors = measurements[:, matter_indices] distance_to_error = np.min(np.where(errors, distances, np.inf), axis=1) return measurements[distance_to_error >= tolerated_distance_to_error] @@ -1211,7 +1291,9 @@ def extract_zzz_single_cycle( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: self.extract_readout_error_rates() @@ -1288,7 +1370,9 @@ def extract_gauge_x_single_cycle( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: self.extract_readout_error_rates() @@ -1367,7 +1451,9 @@ def extract_matter_x_single_cycle( if cycle_number < 0: raise ValueError("Cycle number should be nonnegative") if initial_state == "superposition" and post_select: - raise ValueError("Post selection is intended for the gauge invariant initial state.") + raise ValueError( + "Post selection is intended for the gauge invariant initial state." + ) if readout_mitigate and len(self.e0) == 0 or len(self.e1) == 0: self.extract_readout_error_rates() @@ -1388,13 +1474,20 @@ def extract_matter_x_single_cycle( e0[idx] = 0.0 e1[idx] = 0.0 for q in self.lgtdfl.matter_qubits: - qubits_i = [q_n for q_n in q.neighbors() if q_n in self.lgtdfl.all_qubits] + [q] - indices = np.array([ - self.lgtdfl.all_qubits.index(cast(cirq.GridQubit, qi)) for qi in qubits_i - ]) + qubits_i = [ + q_n for q_n in q.neighbors() if q_n in self.lgtdfl.all_qubits + ] + [q] + indices = np.array( + [ + self.lgtdfl.all_qubits.index(cast(cirq.GridQubit, qi)) + for qi in qubits_i + ] + ) if post_select and tolerated_distance_to_error < np.inf: measurements_i = self.post_select_measurements_on_distance( - measurements, tolerated_distance_to_error, cast(list[cirq.GridQubit], qubits_i) + measurements, + tolerated_distance_to_error, + cast(list[cirq.GridQubit], qubits_i), )[:, indices] print( f"Matter X term, cycle {cycle_number}, qubit {q}, {len(measurements_i)} counts of {len(measurements)}" @@ -1408,16 +1501,21 @@ def extract_matter_x_single_cycle( for i in indices: e0_sq = e0[i] e1_sq = e1[i] - single_qubit_cmats.append(np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]])) + single_qubit_cmats.append( + np.array([[1 - e0_sq, e1_sq], [e0_sq, 1 - e1_sq]]) + ) qubits_to_mitigate.append(self.qubits[i]) - tcm = cirq.experiments.TensoredConfusionMatrices(single_qubit_cmats, - [[q] for q in qubits_to_mitigate], - repetitions=measurements_i.shape[ - 0], timestamp=0.0) + tcm = cirq.experiments.TensoredConfusionMatrices( + single_qubit_cmats, + [[q] for q in qubits_to_mitigate], + repetitions=measurements_i.shape[0], + timestamp=0.0, + ) - x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated(qubits_to_mitigate, - measurements_i) + x_i, dx_i = tcm.readout_mitigation_pauli_uncorrelated( + qubits_to_mitigate, measurements_i + ) else: repetitions = measurements_i.shape[0] p1 = np.mean(np.sum(measurements_i, axis=1) % 2) From 28b7159967f3e4fe882820d698ffdee36296e975 Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 03:05:02 +0000 Subject: [PATCH 3/8] fix parallel pool typo --- recirq/dfl/dfl_experiment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py index c3405fa2..5895f24f 100644 --- a/recirq/dfl/dfl_experiment.py +++ b/recirq/dfl/dfl_experiment.py @@ -1073,7 +1073,7 @@ def __init__( self.e0 = np.array([]) self.e1 = np.array([]) self.num_gc_instances = num_gc_instances - self.executor = process_pool.get_executor() + self.executor = concurrent.futures.ProcessPoolExecutor() self.readout_ideal_bitstrs = np.array([]) self.save_directory = save_directory self.lgtdfl = dfl_2d.LGTDFL(qubits, origin_qubit, tau, h, mu) From 7cea5156004bc254a4b0484ba5c7b69e7477826c Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 04:01:14 +0000 Subject: [PATCH 4/8] save postselection fraction --- recirq/dfl/dfl_experiment.py | 59 ++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py index 5895f24f..d1310146 100644 --- a/recirq/dfl/dfl_experiment.py +++ b/recirq/dfl/dfl_experiment.py @@ -1317,6 +1317,25 @@ def extract_zzz_single_cycle( print( f"Interaction term, cycle {cycle_number}, qubit {op_qubit}, {len(measurements_q)} counts of {len(measurements)}" ) + if os.path.isfile("dfl_surviving_counts.pickle"): + log_dict = pickle.load(open("dfl_surviving_counts.pickle", "rb")) + else: + log_dict = { + "gauge_x": [{} for _ in range(31)], + "matter_x": [{} for _ in range(31)], + "interaction": [{} for _ in range(31)], + "gauge_x_zero_trotter": [{} for _ in range(31)], + "matter_x_zero_trotter": [{} for _ in range(31)], + "interaction_zero_trotter": [{} for _ in range(31)], + "total_measurements": [0 for _ in range(31)], + } + key = "interaction" + if zero_trotter: + key += "_zero_trotter" + log_dict[key][cycle_number][op_qubit] = len(measurements_q) + log_dict["total_measurements"][cycle_number] = len(measurements) + pickle.dump(log_dict, open("dfl_surviving_counts.pickle", "wb")) + if len(measurements_q) == 0: p1[idx] = np.nan dp1[idx] = np.nan @@ -1396,6 +1415,26 @@ def extract_gauge_x_single_cycle( print( f"Gauge X term, cycle {cycle_number}, qubit {op_qubit}, {len(measurements_q)} counts of {len(measurements)}" ) + + if os.path.isfile("dfl_surviving_counts.pickle"): + log_dict = pickle.load(open("dfl_surviving_counts.pickle", "rb")) + else: + log_dict = { + "gauge_x": [{} for _ in range(31)], + "matter_x": [{} for _ in range(31)], + "interaction": [{} for _ in range(31)], + "gauge_x_zero_trotter": [{} for _ in range(31)], + "matter_x_zero_trotter": [{} for _ in range(31)], + "interaction_zero_trotter": [{} for _ in range(31)], + "total_measurements": [0 for _ in range(31)], + } + key = "gauge_x" + if zero_trotter: + key += "_zero_trotter" + log_dict[key][cycle_number][op_qubit] = len(measurements_q) + log_dict["total_measurements"][cycle_number] = len(measurements) + pickle.dump(log_dict, open("dfl_surviving_counts.pickle", "wb")) + if len(measurements_q) == 0: p1[idx] = np.nan dp1[idx] = np.nan @@ -1492,6 +1531,26 @@ def extract_matter_x_single_cycle( print( f"Matter X term, cycle {cycle_number}, qubit {q}, {len(measurements_i)} counts of {len(measurements)}" ) + + if os.path.isfile("dfl_surviving_counts.pickle"): + log_dict = pickle.load(open("dfl_surviving_counts.pickle", "rb")) + else: + log_dict = { + "gauge_x": [{} for _ in range(31)], + "matter_x": [{} for _ in range(31)], + "interaction": [{} for _ in range(31)], + "gauge_x_zero_trotter": [{} for _ in range(31)], + "matter_x_zero_trotter": [{} for _ in range(31)], + "interaction_zero_trotter": [{} for _ in range(31)], + "total_measurements": [0 for _ in range(31)], + } + key = "matter_x" + if zero_trotter: + key += "_zero_trotter" + log_dict[key][cycle_number][q] = len(measurements_i) + log_dict["total_measurements"][cycle_number] = len(measurements) + pickle.dump(log_dict, open("dfl_surviving_counts.pickle", "wb")) + else: measurements_i = measurements[:, indices] if readout_mitigate: From 36ff4828a20aa2fc573fd6428f62f34802957206 Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 04:03:16 +0000 Subject: [PATCH 5/8] Add manual test --- recirq/dfl/dfl_experiment_test_manual.py | 165 +++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 recirq/dfl/dfl_experiment_test_manual.py diff --git a/recirq/dfl/dfl_experiment_test_manual.py b/recirq/dfl/dfl_experiment_test_manual.py new file mode 100644 index 00000000..94ec239c --- /dev/null +++ b/recirq/dfl/dfl_experiment_test_manual.py @@ -0,0 +1,165 @@ +import os + +import cirq +import numpy as np + +from . import dfl_experiment as dfl + +if not os.path.isdir("test_circuits"): + os.mkdir("test_circuits") + + +def test_dfl_experiment_3x3(): + """This test takes a minute to run, so I'm leaving it as a manual test.""" + qubits = cirq.GridQubit.rect(3, 3, 0, 0) + sampler = cirq.Simulator() + for num_rc_instances in [1, 2]: + for use_cphase in [False, True]: + dfl_expt = dfl.DFLExperiment2D( + sampler, + qubits, + origin_qubit=cirq.q(0, 0), + save_directory="test_circuits", + num_rc_instances=num_rc_instances, + excited_qubits=[cirq.q(0, 1)], + cycles=np.array([0, 1]), + use_cphase=use_cphase, + ) + for random_compile in [False, True]: + for dynamical_decouple in [False, True]: + dfl_expt.run_experiment( + random_compile=random_compile, + dynamical_decouple=dynamical_decouple, + repetitions_post_selection=np.ones(len(dfl_expt.cycles), dtype=int) + * 10_000, + ) + + # check readout error rates + assert np.all(dfl_expt.e0 == np.zeros(len(dfl_expt.qubits))) + assert np.all(dfl_expt.e1 == np.zeros(len(dfl_expt.qubits))) + + # check gauge_x + signs = np.ones(len(dfl_expt.lgtdfl._gauge_indices())) + signs[0] = -1 + gauge_expected = dfl_expt.h / np.sqrt(dfl_expt.h**2 + 1) * signs + + for readout_mitigate in [False, True]: + for initial_state in ["gauge_invariant", "superposition"]: + for zero_trotter in [False, True]: + for post_select in [False, True]: + if post_select and initial_state == "superposition": + continue + else: + gauge_x = dfl_expt.extract_gauge_x( + initial_state, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose( + gauge_x[0, 0], + gauge_expected, + atol=5 * gauge_x[0, 1], + ) + ) + if zero_trotter: + assert np.all( + np.isclose( + gauge_x[0, 0], + gauge_x[1, 0], + atol=5 + * np.sqrt( + gauge_x[0, 1] ** 2 + gauge_x[1, 1] ** 2 + ), + ) + ) + + # check matter_x + matter_expected = [] + for q_matter in dfl_expt.lgtdfl.matter_qubits: + neighbor_indices = np.array( + [ + list(dfl_expt.lgtdfl.gauge_qubits).index(q) + for q in q_matter.neighbors() + if q in dfl_expt.qubits + ] + ) + matter_expected.append(np.prod(gauge_expected[neighbor_indices])) + + for readout_mitigate in [False, True]: + for post_select in [False, True]: + for zero_trotter in [False, True]: + matter_x = dfl_expt.extract_matter_x( + "gauge_invariant", + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose( + matter_x[0, 0], matter_expected, atol=5 * matter_x[0, 1] + ) + ) + if zero_trotter: + assert np.all( + np.isclose( + matter_x[0, 0], + matter_x[1, 0], + atol=5 + * np.sqrt(matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2), + ) + ) + + for readout_mitigate in [False, True]: + for zero_trotter in [False, True]: + matter_x = dfl_expt.extract_matter_x( + "superposition", + readout_mitigate=readout_mitigate, + post_select=False, + zero_trotter=zero_trotter, + ) + assert np.all(np.isclose(matter_x[0, 0], 0.0, atol=5 * matter_x[0, 1])) + if zero_trotter: + assert np.all( + np.isclose( + matter_x[0, 0], + matter_x[1, 0], + atol=5 * np.sqrt(matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2), + ) + ) + + # check interaction + interaction_expected = 1 / np.sqrt(dfl_expt.h**2 + 1) * signs + for readout_mitigate in [False, True]: + for initial_state in ["gauge_invariant", "superposition"]: + for zero_trotter in [False, True]: + for post_select in [False, True]: + if post_select and initial_state == "superposition": + continue + else: + interaction = dfl_expt.extract_interaction( + initial_state, + readout_mitigate=readout_mitigate, + post_select=post_select, + zero_trotter=zero_trotter, + ) + assert np.all( + np.isclose( + interaction[0, 0], + interaction_expected, + atol=5 * interaction[0, 1], + ) + ) + if zero_trotter: + assert np.all( + np.isclose( + interaction[0, 0], + interaction[1, 0], + atol=5 + * np.sqrt( + interaction[0, 1] ** 2 + + interaction[1, 1] ** 2 + ), + ) + ) From 66ddd3b28f126c4a3ccdbc0a90ae5e2c676f9391 Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 04:25:31 +0000 Subject: [PATCH 6/8] fix test --- recirq/dfl/dfl_experiment_test_manual.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/recirq/dfl/dfl_experiment_test_manual.py b/recirq/dfl/dfl_experiment_test_manual.py index 94ec239c..48c86c34 100644 --- a/recirq/dfl/dfl_experiment_test_manual.py +++ b/recirq/dfl/dfl_experiment_test_manual.py @@ -13,14 +13,14 @@ def test_dfl_experiment_3x3(): """This test takes a minute to run, so I'm leaving it as a manual test.""" qubits = cirq.GridQubit.rect(3, 3, 0, 0) sampler = cirq.Simulator() - for num_rc_instances in [1, 2]: + for num_gc_instances in [1, 2]: for use_cphase in [False, True]: dfl_expt = dfl.DFLExperiment2D( sampler, qubits, origin_qubit=cirq.q(0, 0), save_directory="test_circuits", - num_rc_instances=num_rc_instances, + num_gc_instances=num_gc_instances, excited_qubits=[cirq.q(0, 1)], cycles=np.array([0, 1]), use_cphase=use_cphase, From 828fdc4333add127eddf0914e00805d2575ada04 Mon Sep 17 00:00:00 2001 From: Eliott Rosenberg Date: Wed, 5 Nov 2025 04:42:48 +0000 Subject: [PATCH 7/8] fix terminology --- recirq/dfl/dfl_experiment.py | 24 ++++++++--------- recirq/dfl/dfl_experiment_test_manual.py | 33 +++++++++++++++++------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py index d1310146..7782fdc5 100644 --- a/recirq/dfl/dfl_experiment.py +++ b/recirq/dfl/dfl_experiment.py @@ -134,7 +134,7 @@ def generate_circuit_instances( initial_state: str, ncycles: int, basis: str, - random_compile: bool = True, + gauge_compile: bool = True, dynamical_decouple: bool = True, dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), zero_trotter: bool = False, @@ -157,7 +157,7 @@ def generate_circuit_instances( "dual", )[["z", "x"].index(basis)] - if random_compile: + if gauge_compile: circuits = list( self.executor.map( partial(_apply_gauge_compiling, circuit=circuit), @@ -210,12 +210,12 @@ def identify_ideal_readout_bitstrings_from_circuits(self): def generate_all_circuits( self, - random_compile: bool = True, + gauge_compile: bool = True, dynamical_decouple: bool = True, dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), num_random_bitstrings: int = 30, ): - if not random_compile: + if not gauge_compile: self.num_gc_instances = 1 num_gc_instances = self.num_gc_instances @@ -237,7 +237,7 @@ def generate_all_circuits( initial_state, ncycles, basis, - random_compile, + gauge_compile, dynamical_decouple, dd_sequence, zero_trotter, @@ -325,7 +325,7 @@ def run_experiment( repetitions_non_post_selection: int | list[int] = 2000, batch_size: int = 500, readout_repetitions: int = 1000, - random_compile: bool = True, + gauge_compile: bool = True, dynamical_decouple: bool = True, dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), num_random_bitstrings: int = 30, @@ -377,14 +377,14 @@ def run_experiment( repetitions_post_selection: How many repetitions to use for the gauge invariant initial state at each cycle number. repetitions_non_post_selection: How many repetitions to use for the superposition initial state. batch_size: The maximum number of circuits per file and per run_batch call. - random_compile: Whether to add randomized compiling. + gauge_compile: Whether to add gauge compiling. dynamical_decouple: Whether to add dynamical decoupling. dd_sequence: The dynamical decoupling sequence to use. num_random_bitstrings: The number of bitstrings to use for readout mitigation. readout_repetitions: The number of repetitions to use for readout benchmarking. """ self.generate_all_circuits( - random_compile=random_compile, + gauge_compile=gauge_compile, dynamical_decouple=dynamical_decouple, dd_sequence=dd_sequence, num_random_bitstrings=num_random_bitstrings, @@ -1041,7 +1041,7 @@ class DFLExperiment2D(DFLExperiment): tau: The Trotter step size. rng: The pseudorandom number generator to use for readout benchmarking. cycles: The number of cycles for which to run the experiment. - num_gc_instances: The number of randomized compiling instances to use. + num_gc_instances: The number of gauge compiling instances to use. excited_qubits: Which qubits to excite. use_cphase: Whether to use cphase gates. include_zero_trotter: Whether to include circuits with the Trotter step set to 0. @@ -1142,7 +1142,7 @@ def generate_circuit_instances( initial_state: str, ncycles: int, basis: str, - random_compile: bool = True, + gauge_compile: bool = True, dynamical_decouple: bool = True, dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), zero_trotter: bool = False, @@ -1156,7 +1156,7 @@ def generate_circuit_instances( initial_state: Which initial state, either "gauge_invariant" or "superposition". ncycles: The number of Trotter steps (can be 0). basis: The basis in which to measure. Either "x" or "z". - random_compile: Whether to apply randomized compiling. + gauge_compile: Whether to apply gauge compiling. dynamical_decouple: Whether to apply dynamical decoupling. dd_sequence: The dynamical decoupling sequence to use. zero_trotter: Whether to set the trotter step size to 0 (used to calibrate error mitigation). @@ -1209,7 +1209,7 @@ def generate_circuit_instances( ) circuit = cirq.Circuit(new_moment_0) + circuit[1:] - if random_compile: + if gauge_compile: circuits = list( self.executor.map( partial(_apply_gauge_compiling, circuit=circuit), diff --git a/recirq/dfl/dfl_experiment_test_manual.py b/recirq/dfl/dfl_experiment_test_manual.py index 48c86c34..1b7792f3 100644 --- a/recirq/dfl/dfl_experiment_test_manual.py +++ b/recirq/dfl/dfl_experiment_test_manual.py @@ -25,12 +25,14 @@ def test_dfl_experiment_3x3(): cycles=np.array([0, 1]), use_cphase=use_cphase, ) - for random_compile in [False, True]: + for gauge_compile in [False, True]: for dynamical_decouple in [False, True]: dfl_expt.run_experiment( - random_compile=random_compile, + gauge_compile=gauge_compile, dynamical_decouple=dynamical_decouple, - repetitions_post_selection=np.ones(len(dfl_expt.cycles), dtype=int) + repetitions_post_selection=np.ones( + len(dfl_expt.cycles), dtype=int + ) * 10_000, ) @@ -70,7 +72,8 @@ def test_dfl_experiment_3x3(): gauge_x[1, 0], atol=5 * np.sqrt( - gauge_x[0, 1] ** 2 + gauge_x[1, 1] ** 2 + gauge_x[0, 1] ** 2 + + gauge_x[1, 1] ** 2 ), ) ) @@ -85,7 +88,9 @@ def test_dfl_experiment_3x3(): if q in dfl_expt.qubits ] ) - matter_expected.append(np.prod(gauge_expected[neighbor_indices])) + matter_expected.append( + np.prod(gauge_expected[neighbor_indices]) + ) for readout_mitigate in [False, True]: for post_select in [False, True]: @@ -98,7 +103,9 @@ def test_dfl_experiment_3x3(): ) assert np.all( np.isclose( - matter_x[0, 0], matter_expected, atol=5 * matter_x[0, 1] + matter_x[0, 0], + matter_expected, + atol=5 * matter_x[0, 1], ) ) if zero_trotter: @@ -107,7 +114,10 @@ def test_dfl_experiment_3x3(): matter_x[0, 0], matter_x[1, 0], atol=5 - * np.sqrt(matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2), + * np.sqrt( + matter_x[0, 1] ** 2 + + matter_x[1, 1] ** 2 + ), ) ) @@ -119,13 +129,18 @@ def test_dfl_experiment_3x3(): post_select=False, zero_trotter=zero_trotter, ) - assert np.all(np.isclose(matter_x[0, 0], 0.0, atol=5 * matter_x[0, 1])) + assert np.all( + np.isclose(matter_x[0, 0], 0.0, atol=5 * matter_x[0, 1]) + ) if zero_trotter: assert np.all( np.isclose( matter_x[0, 0], matter_x[1, 0], - atol=5 * np.sqrt(matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2), + atol=5 + * np.sqrt( + matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2 + ), ) ) From fccc8f3fecc698f5d649b579eb587fc9c110be7a Mon Sep 17 00:00:00 2001 From: Shashwat Kumar Date: Sat, 8 Nov 2025 01:23:23 +0000 Subject: [PATCH 8/8] Added the missing public method docstrings --- recirq/dfl/dfl_1d.py | 43 +++++++- recirq/dfl/dfl_2d_second_order_trotter.py | 116 ++++++++++++++++++++-- recirq/dfl/dfl_experiment.py | 82 ++++++++++++--- recirq/dfl/dfl_experiment_test_manual.py | 19 ++-- 4 files changed, 229 insertions(+), 31 deletions(-) diff --git a/recirq/dfl/dfl_1d.py b/recirq/dfl/dfl_1d.py index 5e0874f3..0ce06db5 100644 --- a/recirq/dfl/dfl_1d.py +++ b/recirq/dfl/dfl_1d.py @@ -18,6 +18,23 @@ def trotter_circuit( mu: float, two_qubit_gate="cz", ) -> cirq.Circuit: + """Constructs the Trotter circuit for 1D DFL simulation. + + Args: + grid: The 1D sequence of qubits used in the experiment. + n_cycles: The number of Trotter steps/cycles to include. + dt: The time step size for the Trotterization. + h: The gauge field strength coefficient. + mu: The matter field strength coefficient. + two_qubit_gate: The type of two-qubit gate to use in the layers. + Valid values are "cz" or "cphase". + + Returns: + The complete cirq.Circuit for the Trotter evolution. + + Raises: + ValueError: If an invalid `two_qubit_gate` option is provided. + """ if two_qubit_gate == "cz": return _layer_floquet_cz(grid, dt, h, mu) * n_cycles @@ -226,7 +243,31 @@ def get_1d_dfl_experiment_circuits( n_instances: int = 10, two_qubit_gate: str = "cz", basis="dual", -): +) -> List[cirq.Circuit]: + """Generates the circuits needed for the 1D DFL experiment. + + Args: + grid: The 1D sequence of qubits used in the experiment. + initial_state: The initial state preparation. Valid values are + "single_sector" or "superposition". + n_cycles: The number of Trotter steps (cycles) to simulate. + excited_qubits: Qubits to be excited in the initial state. + dt: The time step size for the Trotterization. + h: The gauge field strength coefficient. + mu: The matter field strength coefficient. + n_instances: The number of instances to generate. + two_qubit_gate: The type of two-qubit gate to use in the Trotter step. + Valid values are "cz" or "cphase". + basis: The basis for the final circuit structure. Valid values are + "lgt" or "dual". + + Returns: + A list of all generated cirq.Circuit objects. + + Raises: + ValueError: If an invalid option for `initial_state` or + `basis` is given. + """ if initial_state == "single_sector": initial_circuit = _energy_bump_initial_state( grid, h, "single_sector", excited_qubits diff --git a/recirq/dfl/dfl_2d_second_order_trotter.py b/recirq/dfl/dfl_2d_second_order_trotter.py index 38ec4431..2641840d 100644 --- a/recirq/dfl/dfl_2d_second_order_trotter.py +++ b/recirq/dfl/dfl_2d_second_order_trotter.py @@ -50,8 +50,17 @@ def __init__( self.all_qubits = sorted(self.matter_qubits + self.gauge_qubits) def get_y_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: - """Returns horizontal neighbors. If matter qubit is provided return gauge qubits and vice - versa.""" + """Returns the vertical (row-axis) neighbors of a qubit. + If a matter qubit is provided, this returns the neighboring gauge qubits, + and vice-versa. + + Args: + qubit: The central qubit. + + Returns: + A sequence of neighboring qubits of the opposite type that are + within the defined grid. + """ if qubit in self.gauge_qubits: return [ q @@ -72,8 +81,17 @@ def get_y_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: ] def get_x_neighbors(self, qubit: cirq.GridQubit) -> Sequence[cirq.GridQubit]: - """Returns vertical neighbors. If matter qubit is provided return gauge qubits and vice - versa.""" + """Returns the horizontal (column-axis) neighbors of a qubit. + If a matter qubit is provided, this returns the neighboring gauge qubits, + and vice-versa. + + Args: + qubit: The central qubit. + + Returns: + A sequence of neighboring qubits of the opposite type that are + within the defined grid. + """ if qubit in self.gauge_qubits: return [ q @@ -161,6 +179,15 @@ def _layer_matter_gauge_x(self) -> cirq.Circuit: return cirq.Circuit.from_moments(cirq.Moment(moment)) def layer_hadamard(self, which_qubits="all") -> cirq.Circuit: + """Creates a circuit layer containing Hadamard gates on the specified qubits. + + Args: + which_qubits: Specifies which qubits get the Hadamard gates. + Valid values are "all", "gauge", or "matter". + + Returns: + A cirq.Circuit containing a single Moment of Hadamard operations. + """ moment = [] if which_qubits == "gauge": for q in self.gauge_qubits: @@ -174,12 +201,28 @@ def layer_hadamard(self, which_qubits="all") -> cirq.Circuit: return cirq.Circuit.from_moments(cirq.Moment(moment)) def layer_measure(self) -> cirq.Circuit: + """Creates a circuit layer containing measurement operations on all qubits. + + Returns: + A cirq.Circuit containing a single Moment of measurement operations + with the key "m". + """ moment = [] for q in self.all_qubits: moment.append(cirq.measure(q, key="m")) return cirq.Circuit.from_moments(cirq.Moment(moment)) - def trotter_circuit(self, n_cycles, two_qubit_gate="cz_simultaneous"): + def trotter_circuit(self, n_cycles, two_qubit_gate="cz_simultaneous") -> cirq.Circuit: + """Constructs the second-order Trotter circuit for the DFL Hamiltonian. + + Args: + n_cycles: The number of Trotter steps/cycles to include in the circuit. + two_qubit_gate: The type of two-qubit gate to use in the layers. + Valid values are "cz_simultaneous" or "cphase_simultaneous". + + Returns: + The complete cirq.Circuit for the Trotter evolution. + """ if two_qubit_gate == "cz_simultaneous": return self._layer_floquet_cz_simultaneous() * n_cycles @@ -201,6 +244,22 @@ def trotter_circuit(self, n_cycles, two_qubit_gate="cz_simultaneous"): def layer_floquet( self, two_qubit_gate="cz_simultaneous", layer="middle" ) -> cirq.Circuit: + + """Constructs a layer of the Trotter circuit. + + Args: + two_qubit_gate: The type of two-qubit gate to use. + Valid values are "cz_simultaneous" or "cphase_simultaneous". + layer: Specifies which piece of the sequence to return. + Valid values are "middle", "last", or "first". + + Returns: + The cirq.Circuit representing the requested Trotter layer. + + Raises: + ValueError: If an invalid option for `two_qubit_gate` or `layer` is given. + """ + if two_qubit_gate == "cz_simultaneous": return self._layer_floquet_cz_simultaneous() @@ -220,7 +279,6 @@ def _energy_bump_initial_state( self, matter_config: str, excited_qubits: Sequence[cirq.GridQubit], - two_qubit_gate="cz_simultaneous", ) -> cirq.Circuit: """Circuit for energy bump initial state. It typically consists of single qubit gates and the basis change circuit U_B. @@ -484,8 +542,8 @@ def _compute_observables_one_instance_dual_basis( q_nbrs = [q_n for q_n in q.neighbors() if q_n in self.all_qubits] for q_n in q_nbrs: prod_x *= bits_x_rescaled[ - :, self.all_qubits.index(cast(cirq.GridQubit, q_n)) - ] + :, self.all_qubits.index(cast(cirq.GridQubit, q_n)) + ] matter_x_terms.append(prod_x) exp_matter_x = np.mean(matter_x_terms, axis=1) @@ -531,6 +589,18 @@ def compute_observables_dual_basis( bits_z: Sequence[npt.NDArray[np.int8]], bits_x: Sequence[npt.NDArray[np.int8]], ) -> Sequence[np.ndarray]: + """This calculates the mean and variance over multiple instances + of measurement outcomes for the matter X, gauge X, interaction + and energy terms. + + Args: + bits_z: Measurement outcomes in the Z basis. + bits_x: Measurement outcomes in the X basis. + + Returns: + A sequence of NumPy arrays, where each array contains the mean and + variance for the respective observable. + """ num_instances = len(bits_z) if num_instances == 1: return self._compute_observables_one_instance_dual_basis( @@ -595,6 +665,14 @@ def _postselect_on_charge_one_instance_dual_basis( return bits[selected] def postselect_on_charge_dual_basis(self, bits: npt.NDArray[np.int8]) -> np.ndarray: + """Performs charge post-selection on measurement outcomes. + + Args: + bits: The raw measurement outcomes. + + Returns: + A NumPy array containing only the post-selected measurement instances. + """ num_instances = bits.shape[0] bits_ps = [] for ins in range(num_instances): @@ -611,7 +689,27 @@ def get_2d_dfl_experiment_circuits( n_instances: int = 10, two_qubit_gate: str = "cz_simultaneous", basis="dual", - ): + ) -> List[cirq.Circuit]: + """Generates the set of circuits needed for the 2D DFL experiment. + + Args: + initial_state: The initial state preparation. + Valid values are "single_sector" or "superposition". + n_cycles: The number of Trotter steps (cycles) to simulate. + excited_qubits: Qubits to be excited in the initial state. + n_instances: The number of instances to generate + two_qubit_gate: The type of two-qubit gate to use in the Trotter step. + Valid values are "cz_simultaneous" or "cphase_simultaneous". + basis: The basis for the final circuit structure. + Valid values are "lgt" or "dual". + + Returns: + A list of all generated cirq.Circuit objects. + + Raises: + ValueError: If an invalid option for `initial_state` + or `basis` is given. + """ if initial_state == "single_sector": initial_circuit = self._energy_bump_initial_state( "single_sector", excited_qubits diff --git a/recirq/dfl/dfl_experiment.py b/recirq/dfl/dfl_experiment.py index 7782fdc5..cd6ca52d 100644 --- a/recirq/dfl/dfl_experiment.py +++ b/recirq/dfl/dfl_experiment.py @@ -91,6 +91,13 @@ def __init__( self.gauge_sites = np.arange(1, len(qubits), 2) def save_to_file(self, filename: str): + """Saves the experiment parameters and measurement + results to a file. + + Args: + filename: The path and filename to which the data + dictionary will be pickled. + """ d_to_save = { "measurements": self.measurements, "readout_ideal_bitstrs": self.readout_ideal_bitstrs, @@ -112,6 +119,12 @@ def save_to_file(self, filename: str): pickle.dump(d_to_save, open(filename, "wb")) def load_from_file(self, filename: str): + """Loads experiment parameters and measurement results from a saved file. + + Args: + filename: The path and filename from which to load the data + dictionary. + """ d = pickle.load(open(filename, "rb")) self.measurements = d["measurements"] self.readout_ideal_bitstrs = d["readout_ideal_bitstrs"] @@ -139,6 +152,21 @@ def generate_circuit_instances( dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), zero_trotter: bool = False, ) -> list[cirq.Circuit]: + """Generates the circuit instances for a given initial state, number of cycles, + and measurement basis. + + Args: + initial_state: The initial state. Must be "gauge_invariant" or "superposition". + ncycles: The number of Trotter steps. + basis: The measurement basis. Must be "z" or "x". + gauge_compile: Whether to apply gauge compiling. + dynamical_decouple: Whether to apply dynamical decoupling. + dd_sequence: The dynamical decoupling sequence to use. + zero_trotter: Whether to set the Trotter step size to 0. + + Returns: + A list of the generated circuit instances. + """ if initial_state == "gauge_invariant": initial_state = "single_sector" @@ -178,6 +206,14 @@ def generate_circuit_instances( def create_readout_benchmark_circuits( self, num_random_bitstrings: int = 30 ) -> tuple[np.ndarray, list[cirq.Circuit]]: + """Creates circuits for readout error benchmarking. + Args: + num_random_bitstrings: The number of random bitstrings to use. + Returns: + A tuple containing: + 1. The ideal bitstrings + 2. A list of the corresponding readout benchmark circuits. + """ n = len(self.qubits) x_or_I = lambda bit: cirq.X if bit == 1 else cirq.I bitstrs = self.rng.integers(0, 2, size=(num_random_bitstrings, n)) @@ -195,6 +231,8 @@ def create_readout_benchmark_circuits( return bitstrs, random_circuits def identify_ideal_readout_bitstrings_from_circuits(self): + """Identifies the ideal expected bitstrings for the readout calibration circuits. + """ # first identify the number of readout bitstrings: for num_bitstrs, circuit in enumerate(self.all_circuits[::-1]): if len(circuit) > 2: @@ -215,6 +253,16 @@ def generate_all_circuits( dd_sequence: tuple[cirq.Gate, ...] = (cirq.X, cirq.Y, cirq.X, cirq.Y), num_random_bitstrings: int = 30, ): + """Generates and stores all circuits needed for the DFL experiment. + The results are stored in `self.all_circuits`. + + Args: + gauge_compile: Whether to apply gauge compiling instances. + dynamical_decouple: Whether to apply dynamical decoupling to the circuits. + dd_sequence: The dynamical decoupling sequence to use. + num_random_bitstrings: The number of random bitstrings to use for + readout error benchmarking. + """ if not gauge_compile: self.num_gc_instances = 1 num_gc_instances = self.num_gc_instances @@ -224,10 +272,10 @@ def generate_all_circuits( all_circuits = [] with tqdm( total=2 - * 2 - * len(zero_trotter_options) - * len(self.cycles) - * num_gc_instances + * 2 + * len(zero_trotter_options) + * len(self.cycles) + * num_gc_instances ) as pbar: for initial_state in ["gauge_invariant", "superposition"]: for basis in ["z", "x"]: @@ -265,7 +313,8 @@ def load_circuits_from_file( Args: filename: The filename from which to load the circuits. - old_qubits_list: The previous ordered list of qubits if mapping to different qubits. + old_qubits_list: The previous ordered list of qubits if + mapping to different qubits. """ circuits = pickle.load(open(filename, "rb")) if old_qubits_list is not None and old_qubits_list != self.qubits: @@ -298,7 +347,7 @@ def shuffle_circuits_and_save( circuits_shuffled = [circuits[_] for _ in indices] # save the shuffled circuits for start_idx in tqdm(range(0, len(circuits), batch_size)): - circuits_i = circuits_shuffled[start_idx : (start_idx + batch_size)] + circuits_i = circuits_shuffled[start_idx: (start_idx + batch_size)] pickle.dump( circuits_i, open( @@ -513,8 +562,8 @@ def run_experiment_from_saved_circuits( ) end_index = start_index + self.num_gc_instances repetitions[start_index:end_index] = [ - reps_list[cycle_number] - ] * self.num_gc_instances + reps_list[cycle_number] + ] * self.num_gc_instances # now apply the shuffle repetitions = [repetitions[index] for index in indices] @@ -533,11 +582,11 @@ def run_experiment_from_saved_circuits( for circuit in circuits_i ] print( - f"Shots this batch: {sum(repetitions[start_idx : (start_idx + batch_size)])}" + f"Shots this batch: {sum(repetitions[start_idx: (start_idx + batch_size)])}" ) result = self.sampler.run_batch( circuits_i, - repetitions=repetitions[start_idx : (start_idx + batch_size)], + repetitions=repetitions[start_idx: (start_idx + batch_size)], ) results_i = [res[0].measurements["m"].astype(bool) for res in result] if not os.path.isdir(self.save_directory + f"/shuffled_results"): @@ -578,6 +627,13 @@ def load_shuffled_measurements(self): self.measurements = [measurements_shuffled[i] for i in inv_map] def extract_readout_error_rates(self) -> None: + """Calculates the readout error rates e0 and e1 from the readout + benchmarking measurements. The results are stored in the `self.e0` + (1 -> 0 error rate) and `self.e1` (0 -> 1 error rate) attributes. + + Returns: + None. + """ ideal_bitstrs = self.readout_ideal_bitstrs num_bitstrs = len(ideal_bitstrs) readout_measurements = np.array(self.measurements[-num_bitstrs:]) @@ -647,7 +703,7 @@ def extract_measurements( ) measurements = np.array( - self.measurements[start_index : (start_index + self.num_gc_instances)] + self.measurements[start_index: (start_index + self.num_gc_instances)] ) repetitions = len(measurements[0]) measurements = measurements.reshape( @@ -1514,8 +1570,8 @@ def extract_matter_x_single_cycle( e1[idx] = 0.0 for q in self.lgtdfl.matter_qubits: qubits_i = [ - q_n for q_n in q.neighbors() if q_n in self.lgtdfl.all_qubits - ] + [q] + q_n for q_n in q.neighbors() if q_n in self.lgtdfl.all_qubits + ] + [q] indices = np.array( [ self.lgtdfl.all_qubits.index(cast(cirq.GridQubit, qi)) diff --git a/recirq/dfl/dfl_experiment_test_manual.py b/recirq/dfl/dfl_experiment_test_manual.py index 1b7792f3..387ea47e 100644 --- a/recirq/dfl/dfl_experiment_test_manual.py +++ b/recirq/dfl/dfl_experiment_test_manual.py @@ -10,7 +10,10 @@ def test_dfl_experiment_3x3(): - """This test takes a minute to run, so I'm leaving it as a manual test.""" + """Performs a comprehensive integration test of the 2D DFL experiment setup. + + Note: This test is designed to be run manually as it is time-consuming. + """ qubits = cirq.GridQubit.rect(3, 3, 0, 0) sampler = cirq.Simulator() for num_gc_instances in [1, 2]: @@ -33,7 +36,7 @@ def test_dfl_experiment_3x3(): repetitions_post_selection=np.ones( len(dfl_expt.cycles), dtype=int ) - * 10_000, + * 10_000, ) # check readout error rates @@ -43,7 +46,7 @@ def test_dfl_experiment_3x3(): # check gauge_x signs = np.ones(len(dfl_expt.lgtdfl._gauge_indices())) signs[0] = -1 - gauge_expected = dfl_expt.h / np.sqrt(dfl_expt.h**2 + 1) * signs + gauge_expected = dfl_expt.h / np.sqrt(dfl_expt.h ** 2 + 1) * signs for readout_mitigate in [False, True]: for initial_state in ["gauge_invariant", "superposition"]: @@ -71,7 +74,7 @@ def test_dfl_experiment_3x3(): gauge_x[0, 0], gauge_x[1, 0], atol=5 - * np.sqrt( + * np.sqrt( gauge_x[0, 1] ** 2 + gauge_x[1, 1] ** 2 ), @@ -114,7 +117,7 @@ def test_dfl_experiment_3x3(): matter_x[0, 0], matter_x[1, 0], atol=5 - * np.sqrt( + * np.sqrt( matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2 ), @@ -138,14 +141,14 @@ def test_dfl_experiment_3x3(): matter_x[0, 0], matter_x[1, 0], atol=5 - * np.sqrt( + * np.sqrt( matter_x[0, 1] ** 2 + matter_x[1, 1] ** 2 ), ) ) # check interaction - interaction_expected = 1 / np.sqrt(dfl_expt.h**2 + 1) * signs + interaction_expected = 1 / np.sqrt(dfl_expt.h ** 2 + 1) * signs for readout_mitigate in [False, True]: for initial_state in ["gauge_invariant", "superposition"]: for zero_trotter in [False, True]: @@ -172,7 +175,7 @@ def test_dfl_experiment_3x3(): interaction[0, 0], interaction[1, 0], atol=5 - * np.sqrt( + * np.sqrt( interaction[0, 1] ** 2 + interaction[1, 1] ** 2 ),