Source code for tensorcircuit.basecircuit

"""
Quantum circuit: common methods for all circuit classes as MixIn

Note:
  - Supports qubit (d = 2) and qudit (d >= 2) systems.
  - For string-encoded samples/counts when d <= 36, digits use base-d characters 0-9A-Z (A = 10, ..., Z = 35).
"""

# pylint: disable=invalid-name

from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from functools import partial
import logging

import numpy as np
import graphviz
import tensornetwork as tn

from . import gates
from .quantum import (
    QuOperator,
    QuVector,
    correlation_from_samples,
    correlation_from_counts,
    measurement_counts,
    sample_int2bin,
    sample2all,
    _infer_num_sites,
    _decode_basis_label,
    onehot_d_tensor,
)
from .abstractcircuit import AbstractCircuit
from .cons import npdtype, backend, dtypestr, contractor, rdtypestr
from .simplify import _split_two_qubit_gate
from .utils import arg_alias

logger = logging.getLogger(__name__)

Gate = gates.Gate
Tensor = Any


[docs] class BaseCircuit(AbstractCircuit): _nodes: List[tn.Node] _front: List[tn.Edge] is_dm: bool split: Optional[Dict[str, Any]] is_mps = False
[docs] @staticmethod def all_zero_nodes(n: int, prefix: str = "qb-", dim: int = 2) -> List[tn.Node]: prefix = "qd-" if dim > 2 else prefix l = [0.0] * dim l[0] = 1.0 nodes = [ tn.Node( np.array( l, dtype=npdtype, ), name=prefix + str(x), ) for x in range(n) ] return nodes
[docs] @staticmethod def front_from_nodes(nodes: List[tn.Node]) -> List[tn.Edge]: return [n.get_edge(0) for n in nodes]
def _tensors_to_nodes( self, tensors: Sequence[Tensor] ) -> Tuple[List[tn.Node], List[tn.Edge]]: """ Internal method to convert a sequence of MPS tensors to a list of nodes and front edges. (bond-left, physical, bond-right) order is assumed for MPS tensors. :param tensors: A sequence of tensors representing an MPS. :type tensors: Sequence[Tensor] :return: A tuple of (all nodes including dummy boundary nodes, front/physical edges). :rtype: Tuple[List[tn.Node], List[tn.Edge]] """ nodes = [ tn.Node(backend.cast(backend.convert_to_tensor(t), dtypestr)) for t in tensors ] for i in range(len(nodes) - 1): nodes[i].get_edge(2) ^ nodes[i + 1].get_edge(0) q_nodes = nodes all_nodes = list(q_nodes) for i, axis in zip([0, -1], [0, 2]): if q_nodes[i].get_edge(axis).dimension == 1: dummy = tn.Node( backend.cast(backend.convert_to_tensor([1.0]), dtypestr) ) q_nodes[i].get_edge(axis) ^ dummy.get_edge(0) all_nodes.append(dummy) front = [q_nodes[i].get_edge(1) for i in range(len(tensors))] return all_nodes, front
[docs] @staticmethod def coloring_nodes( nodes: Sequence[tn.Node], is_dagger: bool = False, flag: str = "inputs" ) -> None: r""" Tag nodes with metadata used for casual lightcone simplification and tracing. :param nodes: A sequence of tensornetwork nodes to tag. :type nodes: Sequence[tn.Node] :param is_dagger: Whether the nodes represent conjugate operations (U^\dagger), defaults to False. :type is_dagger: bool, optional :param flag: A label for the node type (e.g., "gate", "inputs", "operator"), defaults to "inputs". :type flag: str, optional """ for node in nodes: node.is_dagger = is_dagger node.flag = flag node.id = id(node)
[docs] @staticmethod def coloring_copied_nodes( nodes: Sequence[tn.Node], nodes0: Sequence[tn.Node], is_dagger: bool = True, flag: str = "inputs", ) -> None: """ Tag copied nodes while preserving the original node's identity for lightcone cancellation. :param nodes: A sequence of newly copied nodes. :type nodes: Sequence[tn.Node] :param nodes0: The sequence of original nodes from which `nodes` were copied. :type nodes0: Sequence[tn.Node] :param is_dagger: Whether the copied nodes represent conjugate operations, defaults to True. :type is_dagger: bool, optional :param flag: A label for the node type, defaults to "inputs". :type flag: str, optional """ for node, n0 in zip(nodes, nodes0): node.is_dagger = is_dagger node.flag = flag node.id = getattr(n0, "id", id(n0))
[docs] @staticmethod def copy_nodes( nodes: Sequence[tn.Node], dangling: Optional[Sequence[tn.Edge]] = None, conj: Optional[bool] = False, ) -> Tuple[List[tn.Node], List[tn.Edge]]: """ copy all nodes and dangling edges correspondingly :return: """ ndict, edict = tn.copy(nodes, conjugate=conj) newnodes = [] for n in nodes: newn = ndict[n] newn.is_dagger = conj newn.flag = getattr(n, "flag", "") + "copy" newn.id = getattr(n, "id", id(n)) newnodes.append(newn) newfront = [] if not dangling: dangling = [] for n in nodes: dangling.extend([e for e in n]) for e in dangling: newfront.append(edict[e]) return newnodes, newfront
def _copy( self, conj: Optional[bool] = False ) -> Tuple[List[tn.Node], List[tn.Edge]]: return self.copy_nodes(self._nodes, self._front, conj)
[docs] def apply_general_gate( self, gate: Union[Gate, QuOperator], *index: int, name: Optional[str] = None, split: Optional[Dict[str, Any]] = None, mpo: bool = False, diagonal: bool = False, ir_dict: Optional[Dict[str, Any]] = None, ) -> None: if name is None: name = "" gate_dict = { "gate": gate, "index": index, "name": name, "split": split, "mpo": mpo, "diagonal": diagonal, } if ir_dict is not None: ir_dict.update(gate_dict) else: ir_dict = gate_dict self._qir.append(ir_dict) assert len(index) == len(set(index)) index = tuple(i if i >= 0 else self._nqubits + i for i in index) noe = len(index) nq = self._nqubits applied = False split_conf = None if split is not None: split_conf = split elif self.split is not None: split_conf = self.split if not (mpo or diagonal): assert isinstance(gate, tn.Node) if (split_conf is not None) and noe == 2: results = _split_two_qubit_gate(gate, **split_conf) # max_err cannot be jax jitted if results is not None: n1, n2, is_swap = results self.coloring_nodes([n1, n2], flag="gate") # n1.flag = "gate" # n1.is_dagger = False n1.name = name # n1.id = id(n1) # n2.flag = "gate" # n2.is_dagger = False # n2.id = id(n2) n2.name = name if is_swap is False: n1[1] ^ self._front[index[0]] n2[2] ^ self._front[index[1]] self._nodes.append(n1) self._nodes.append(n2) self._front[index[0]] = n1[0] self._front[index[1]] = n2[1] if self.is_dm: [n1l, n2l], _ = self.copy_nodes([n1, n2], conj=True) n1l[1] ^ self._front[index[0] + nq] n2l[2] ^ self._front[index[1] + nq] self._nodes.append(n1l) self._nodes.append(n2l) self._front[index[0] + nq] = n1l[0] self._front[index[1] + nq] = n2l[1] else: n2[2] ^ self._front[index[0]] n1[1] ^ self._front[index[1]] self._nodes.append(n1) self._nodes.append(n2) self._front[index[0]] = n1[0] self._front[index[1]] = n2[1] if self.is_dm: [n1l, n2l], _ = self.copy_nodes([n1, n2], conj=True) n2l[1] ^ self._front[index[0] + nq] n1l[2] ^ self._front[index[1] + nq] self._nodes.append(n1l) self._nodes.append(n2l) self._front[index[0] + nq] = n1l[0] self._front[index[1] + nq] = n2l[1] applied = True if applied is False: gate.name = name self.coloring_nodes([gate], flag="gate") # gate.flag = "gate" # gate.is_dagger = False # gate.id = id(gate) self._nodes.append(gate) if self.is_dm: lgates, _ = self.copy_nodes([gate], conj=True) lgate = lgates[0] self._nodes.append(lgate) for i, ind in enumerate(index): gate.get_edge(i + noe) ^ self._front[ind] self._front[ind] = gate.get_edge(i) if self.is_dm: lgate.get_edge(i + noe) ^ self._front[ind + nq] self._front[ind + nq] = lgate.get_edge(i) elif mpo: # gate in MPO format assert isinstance(gate, QuOperator) gatec = gate.copy() self.coloring_nodes(gatec.nodes, flag="gate") for n in gatec.nodes: n.id = id(gate) n.name = name self._nodes += gatec.nodes if self.is_dm: gateconj = gate.adjoint() self.coloring_nodes(gateconj.nodes, flag="gate", is_dagger=True) for _, n in zip(gatec.nodes, gateconj.nodes): n.id = id(gate) n.name = name self._nodes += gateconj.nodes for i, ind in enumerate(index): gatec.in_edges[i] ^ self._front[ind] self._front[ind] = gatec.out_edges[i] if self.is_dm: gateconj.out_edges[i] ^ self._front[ind + nq] self._front[ind + nq] = gateconj.in_edges[i] elif diagonal: if isinstance(gate, tn.Node): mps_nodes = [gate] else: mps_nodes = gate.nodes self.coloring_nodes(mps_nodes, flag="gate") for n in mps_nodes: n.id = id(gate) n.name = name self._nodes += mps_nodes if self.is_dm: if isinstance(gate, tn.Node): gateconj_tensor = backend.conj(gate.tensor) gateconj_node = tn.Node(gateconj_tensor, name=name) gateconj_nodes = [gateconj_node] else: gateconj = gate.adjoint() gateconj_nodes = gateconj.nodes self.coloring_nodes(gateconj_nodes, flag="gate", is_dagger=True) for _, n in zip(mps_nodes, gateconj_nodes): n.id = id(gate) n.name = name self._nodes += gateconj_nodes for i, ind in enumerate(index): if isinstance(gate, tn.Node): phys_edge = gate[i] else: phys_edge = gate.out_edges[i] cn = tn.CopyNode(3, self._d, name=f"{name}_copy_{i}") self.coloring_nodes([cn], flag="gate") self._nodes.append(cn) cn[0] ^ self._front[ind] cn[1] ^ phys_edge self._front[ind] = cn[2] if self.is_dm: if isinstance(gate, tn.Node): phys_edge_conj = gateconj_nodes[0][i] else: phys_edge_conj = gateconj.in_edges[i] cn_conj = tn.CopyNode(3, self._d, name=f"{name}_copy_{i}_conj") self.coloring_nodes([cn_conj], flag="gate", is_dagger=True) self._nodes.append(cn_conj) cn_conj[0] ^ self._front[ind + nq] cn_conj[1] ^ phys_edge_conj self._front[ind + nq] = cn_conj[2] self.state_tensor = None # refresh the state cache
apply = apply_general_gate def _copy_state_tensor( self, conj: bool = False, reuse: bool = True ) -> Tuple[List[tn.Node], List[tn.Edge]]: if reuse: t = getattr(self, "state_tensor", None) if t is None: nodes, d_edges = self._copy() t = contractor(nodes, output_edge_order=d_edges) setattr(self, "state_tensor", t) ndict, edict = tn.copy([t], conjugate=conj) newnodes = [] newnodes.append(ndict[t]) newfront = [] for e in t.edges: newfront.append(edict[e]) return newnodes, newfront return self._copy(conj)
[docs] def expectation_before( self, *ops: Tuple[tn.Node, List[int]], reuse: bool = True, **kws: Any, ) -> List[tn.Node]: """ Get the tensor network in the form of a list of nodes for the expectation calculation before the real contraction :param reuse: _description_, defaults to True :type reuse: bool, optional :raises ValueError: _description_ :return: _description_ :rtype: List[tn.Node] """ nq = self._nqubits if self.is_dm is True: nodes, newdang = self._copy_state_tensor(reuse=reuse) else: nodes1, edge1 = self._copy_state_tensor(reuse=reuse) nodes2, edge2 = self._copy_state_tensor(conj=True, reuse=reuse) nodes = nodes1 + nodes2 newdang = edge1 + edge2 occupied = set() for op, index in ops: if not isinstance(op, tn.Node): # op is only a matrix op = backend.reshaped(op, d=self._d) op = backend.cast(op, dtype=dtypestr) op = gates.Gate(op) else: op.tensor = backend.cast(op.tensor, dtype=dtypestr) if isinstance(index, int): index = [index] index = tuple(i if i >= 0 else self._nqubits + i for i in index) # type: ignore noe = len(index) for j, e in enumerate(index): if e in occupied: raise ValueError("Cannot measure two operators in one index") newdang[e + nq] ^ op.get_edge(j) newdang[e] ^ op.get_edge(j + noe) occupied.add(e) self.coloring_nodes([op], flag="operator") # op.flag = "operator" # op.is_dagger = False # op.id = id(op) nodes.append(op) for j in range(nq): if j not in occupied: # edge1[j].is_dangling invalid here! newdang[j] ^ newdang[j + nq] return nodes
[docs] def perfect_sampling(self, status: Optional[Tensor] = None) -> Tuple[str, float]: """ Sampling base-d strings (0-9A-Z when d <= 36) from the circuit output based on quantum amplitudes. Reference: arXiv:1201.3974. :param status: external randomness, with shape [nqubits], defaults to None :type status: Optional[Tensor] :return: Sampled base-d string and the corresponding theoretical probability. :rtype: Tuple[str, float] """ return self.measure_jit(*range(self._nqubits), with_prob=True, status=status)
[docs] def measure_jit( self, *index: int, with_prob: bool = False, status: Optional[Tensor] = None ) -> Tuple[Tensor, Tensor]: """ Take measurement on the given site indices (computational basis). This method is jittable is and about 100 times faster than unjit version! :param index: Measure on which site (wire) index. :type index: int :param with_prob: If true, theoretical probability is also returned. :type with_prob: bool, optional :param status: external randomness, with shape [index], defaults to None :type status: Optional[Tensor] :return: The sample output and probability (optional) of the quantum line. :rtype: Tuple[Tensor, Tensor] """ # finally jit compatible ! and much faster than unjit version ! (100x) sample: List[Tensor] = [] one_r = backend.cast(backend.convert_to_tensor(1.0), rdtypestr) p = one_r for k, j in enumerate(index): if self.is_dm is False: nodes1, edge1 = self._copy() nodes2, edge2 = self._copy(conj=True) newnodes = nodes1 + nodes2 else: newnodes, newfront = self._copy() nfront = len(newfront) // 2 edge2 = newfront[nfront:] edge1 = newfront[:nfront] for i, e in enumerate(edge1): if i != j: e ^ edge2[i] for i in range(k): if self._d == 2: m = (1 - sample[i]) * gates.array_to_tensor( np.array([1, 0]) ) + sample[i] * gates.array_to_tensor(np.array([0, 1])) else: m = onehot_d_tensor(sample[i], d=self._d) g1 = Gate(m) self.coloring_nodes([g1], flag="measurement") newnodes.append(g1) g1.get_edge(0) ^ edge1[index[i]] g2 = Gate(m) self.coloring_nodes([g2], flag="measurement", is_dagger=True) newnodes.append(g2) g2.get_edge(0) ^ edge2[index[i]] rho = ( 1 / backend.cast(p, dtypestr) * contractor(newnodes, output_edge_order=[edge1[j], edge2[j]]).tensor ) if self._d == 2: pu = backend.real(rho[0, 0]) if status is None: r = backend.implicit_randu()[0] else: r = status[k] r = backend.real(backend.cast(r, dtypestr)) eps = 0.31415926 * 1e-12 sign = ( backend.sign(r - pu + eps) / 2 + 0.5 ) # in case status is exactly 0.5 sign = backend.convert_to_tensor(sign) sign = backend.cast(sign, dtype=rdtypestr) sign_complex = backend.cast(sign, dtypestr) sample.append(sign_complex) p = p * (pu * (-1) ** sign + sign) else: pu = backend.clip( backend.real(backend.diagonal(rho)), backend.convert_to_tensor(0.0), backend.convert_to_tensor(1.0), ) pu = pu / backend.sum(pu) if status is None: ind = backend.implicit_randc( a=backend.arange(self._d), shape=1, p=backend.cast(pu, rdtypestr), ) else: one_r = backend.cast(backend.convert_to_tensor(1.0), rdtypestr) st = backend.cast(status[k : k + 1], rdtypestr) ind = backend.probability_sample( shots=1, p=backend.cast(pu, rdtypestr), status=one_r - st, ) k_out = backend.cast(ind[0], "int32") sample.append(backend.cast(k_out, rdtypestr)) p = p * backend.cast(pu[k_out], rdtypestr) sample = backend.real(backend.stack(sample)) if with_prob: return sample, p else: return sample, -1.0
measure = measure_jit
[docs] def amplitude_before(self, l: Union[str, Tensor]) -> List[Gate]: r""" Returns the tensornetwor nodes for the amplitude of the circuit given the bitstring l. For state simulator, it computes :math:`\langle l\vert \psi\rangle`, for density matrix simulator, it computes :math:`Tr(\rho \vert l\rangle \langle 1\vert)` Note how these two are different up to a square operation. :param l: The bitstring of 0 and 1s. :type l: Union[str, Tensor] :return: The tensornetwork nodes for the amplitude of the circuit. :rtype: List[Gate] """ no, d_edges = self._copy() if isinstance(l, str): l = _decode_basis_label(l, n=self._nqubits, dim=self._d) l = backend.convert_to_tensor(l) endns = onehot_d_tensor(l, d=self._d) ms = [] if self.is_dm: msconj = [] for i in range(self._nqubits): n = tn.Node(endns[i]) self.coloring_nodes([n], flag="measurement") ms.append(n) d_edges[i] ^ n.get_edge(0) if self.is_dm: nconj = tn.Node(endns[i]) self.coloring_copied_nodes( [nconj], [n], flag="measurement", is_dagger=True ) msconj.append(nconj) d_edges[i + self._nqubits] ^ nconj.get_edge(0) no.extend(ms) if self.is_dm: no.extend(msconj) return no
[docs] def amplitude(self, l: Union[str, Tensor]) -> Tensor: r""" Returns the amplitude of the circuit given the bitstring l. For state simulator, it computes :math:`\langle l\vert \psi\rangle`, for density matrix simulator, it computes :math:`Tr(\rho \vert l\rangle \langle 1\vert)` Note how these two are different up to a square operation. :Example: >>> c = tc.Circuit(2) >>> c.X(0) >>> c.amplitude("10") array(1.+0.j, dtype=complex64) >>> c.CNOT(0, 1) >>> c.amplitude("11") array(1.+0.j, dtype=complex64) :param l: The bitstring of 0 and 1s. :type l: Union[str, Tensor] :return: The amplitude of the circuit. :rtype: tn.Node.tensor """ no = self.amplitude_before(l) return contractor(no).tensor
[docs] def probability(self) -> Tensor: """ get the d^n length probability vector over computational basis :return: probability vector of shape [dim**n] :rtype: Tensor """ s = self.state() # type: ignore if self.is_dm is False: amp = backend.reshape(s, [-1]) p = backend.real(backend.abs(amp) ** 2) else: diag = backend.diagonal(s) p = backend.real(backend.reshape(diag, [-1])) return p
def _merge_qir_with_extra(self) -> List[Dict[str, Any]]: from .translation import _merge_extra_qir return _merge_extra_qir(self._qir, getattr(self, "_extra_qir", [])) @staticmethod def _is_measure_instruction_name(name: str) -> bool: return name.upper() in ["MEASURE", "M", "MZ", "MR"] @staticmethod def _is_random_instruction_name(name: str) -> bool: return name.upper() in [ "MEASURE", "M", "MZ", "MR", "RESET", "DEPOLARIZING", "PAULI", ] def _count_detector_random_events( self, merged_qir: Sequence[Dict[str, Any]] ) -> int: c = 0 for d in merged_qir: if d.get("is_channel", False): if self._is_random_instruction_name(str(d.get("name", ""))): c += 1 continue if d.get("instruction", False) and self._is_random_instruction_name( str(d.get("name", "")) ): c += 1 return c def _resolve_detectors( self, merged_qir: Sequence[Dict[str, Any]] ) -> List[List[int]]: measured_so_far = 0 resolved: List[List[int]] = [] for d in merged_qir: if not d.get("instruction", False): continue name = str(d.get("name", "")).upper() if self._is_measure_instruction_name(name): measured_so_far += 1 continue if name != "DETECTOR": continue lookback_indices = [int(v) for v in d.get("index", [])] base = int(d.get("current_m_count", measured_so_far)) abs_indices = [] for v in lookback_indices: rid = base + v if v < 0 else v if rid < 0: raise ValueError( f"Invalid detector lookback index {v} with base {base}." ) if rid >= measured_so_far: raise ValueError( f"Detector index {rid} is out of range for {measured_so_far} measurements." ) abs_indices.append(rid) resolved.append(abs_indices) return resolved def _record_qubits_terminal_measure_only( self, merged_qir: Sequence[Dict[str, Any]] ) -> List[int]: measured_so_far = 0 record_to_qubit: Dict[int, int] = {} for d in merged_qir: if not d.get("instruction", False): continue name = str(d.get("name", "")).upper() if name in ["DETECTOR", "BARRIER", "TICK", "QUBIT_COORDS", "SHIFT_COORDS"]: continue if name in ["MEASURE", "M", "MZ"]: if int(d.get("pos", len(self._qir))) < len(self._qir): raise NotImplementedError( "allow_state=True currently supports terminal measure_instruction only." ) rid = int(d.get("record_index", measured_so_far)) record_to_qubit[rid] = int(d["index"][0]) measured_so_far += 1 continue raise NotImplementedError( f"allow_state=True does not support instruction `{name}`." ) if measured_so_far == 0: raise ValueError("No measurements defined for detector sampling.") record_qubits = [-1] * measured_so_far for rid, q in record_to_qubit.items(): if rid < measured_so_far: record_qubits[rid] = q if any(q < 0 for q in record_qubits): raise ValueError( "Measurement records are incomplete for detector resolution." ) return record_qubits def _walk_detector_trajectory_qir( self, merged_qir: Sequence[Dict[str, Any]], on_channel: Optional[Callable[[Dict[str, Any], str, int], None]] = None, on_gate: Optional[Callable[[Dict[str, Any]], None]] = None, on_measure: Optional[Callable[[Dict[str, Any], int], None]] = None, on_mr: Optional[Callable[[Dict[str, Any], int], None]] = None, on_reset: Optional[Callable[[Dict[str, Any], int], None]] = None, on_noise: Optional[Callable[[Dict[str, Any], int], None]] = None, ) -> None: active = [True] * self._nqubits for d in merged_qir: name = str(d.get("name", "")).upper() if d.get("is_channel", False): q = int(d["index"][0]) if not active[q]: raise NotImplementedError( "Noise after measure requires reset first." ) if on_channel is not None: on_channel(d, name, q) continue if not d.get("instruction", False): inds = [int(i) for i in d.get("index", [])] for q in inds: if q < self._nqubits and not active[q]: raise NotImplementedError( "Gate after measure requires reset_instruction first." ) if on_gate is not None: on_gate(d) continue if name in ["MEASURE", "M", "MZ"]: q = int(d["index"][0]) if not active[q]: raise NotImplementedError("Repeated measure on inactive line.") if on_measure is not None: on_measure(d, q) active[q] = False continue if name == "MR": q = int(d["index"][0]) if not active[q]: raise NotImplementedError("MR on inactive line.") if on_mr is not None: on_mr(d, q) active[q] = True continue if name == "RESET": q = int(d["index"][0]) if on_reset is not None: on_reset(d, q) active[q] = True continue if name in ["DEPOLARIZING", "PAULI"]: q = int(d["index"][0]) if not active[q]: raise NotImplementedError("Noise on inactive line.") if on_noise is not None: on_noise(d, q) continue if name in ["DETECTOR", "BARRIER", "TICK", "QUBIT_COORDS", "SHIFT_COORDS"]: continue raise NotImplementedError( f"Unsupported instruction in sample_detector: {name}" ) def _validate_detector_trajectory_qir( self, merged_qir: Sequence[Dict[str, Any]] ) -> None: self._walk_detector_trajectory_qir(merged_qir) def _run_instruction_trajectory( self, merged_qir: Sequence[Dict[str, Any]], status_row: Optional[Tensor] = None, ) -> Tensor: c = type(self)(**self.circuit_param) records: List[Tensor] = [] status_i = 0 status_len = 0 if status_row is not None: status_len = int(backend.shape_tuple(status_row)[0]) def next_status() -> Optional[Tensor]: nonlocal status_i if status_row is None: return None if status_i >= status_len: raise ValueError( "Provided status is shorter than required random events." ) s = status_row[status_i] status_i += 1 return s def on_channel(d: Dict[str, Any], name: str, q: int) -> None: if name in ["DEPOLARIZING", "PAULI"]: params = d.get("channel_parameters", {}) px = float(params.get("px", 0.0)) py = float(params.get("py", 0.0)) pz = float(params.get("pz", 0.0)) c.depolarizing(q, px=px, py=py, pz=pz, status=next_status()) # type: ignore return self._apply_qir(c, [d], allow_channel=True) # type: ignore[arg-type] def on_gate(d: Dict[str, Any]) -> None: self._apply_qir(c, [d], allow_channel=True) # type: ignore[arg-type] def on_measure(_: Dict[str, Any], q: int) -> None: m = backend.cast(c.cond_measurement(q, status=next_status()), "int32") records.append(m) def on_mr(_: Dict[str, Any], q: int) -> None: m = backend.cast(c.cond_measurement(q, status=next_status()), "int32") records.append(m) c.conditional_gate(m, [gates.i(), gates.x()], q) # type: ignore def on_reset(_: Dict[str, Any], q: int) -> None: m = c.cond_measurement(q, status=next_status()) c.conditional_gate(m, [gates.i(), gates.x()], q) # type: ignore def on_noise(d: Dict[str, Any], q: int) -> None: params = d.get("parameters", {}) px = float(params.get("px", d.get("px", 0.0))) py = float(params.get("py", d.get("py", 0.0))) pz = float(params.get("pz", d.get("pz", 0.0))) c.depolarizing(q, px=px, py=py, pz=pz, status=next_status()) # type: ignore self._walk_detector_trajectory_qir( merged_qir, on_channel=on_channel, on_gate=on_gate, on_measure=on_measure, on_mr=on_mr, on_reset=on_reset, on_noise=on_noise, ) if status_row is not None and status_i != status_len: logger.warning("Provided status is longer than required random events.") if len(records) == 0: return backend.cast(backend.convert_to_tensor([]), "int32") return backend.cast(backend.stack(records), "int32") def _build_detector_tn_wht( self, record_qubits: Sequence[int], resolved_detectors: Sequence[Sequence[int]] ) -> Tuple[List[tn.Node], List[tn.Edge]]: hadamard = np.array([[1.0, 1.0], [1.0, -1.0]], dtype=npdtype) tracev = np.array([1.0, 1.0], dtype=npdtype) if self.is_dm: nodes, front = self._copy() ket_front = front[: self._nqubits] bra_front = front[self._nqubits : 2 * self._nqubits] nodes = list(nodes) else: ket_nodes, ket_front = self._copy() bra_nodes, bra_front = self._copy(conj=True) nodes = list(ket_nodes) + list(bra_nodes) record_edges: List[Optional[tn.Edge]] = [None] * len(record_qubits) qubit_records: Dict[int, List[int]] = {} for rid, q in enumerate(record_qubits): qubit_records.setdefault(q, []).append(rid) for q in range(self._nqubits): rids = qubit_records.get(q, []) if len(rids) == 0: ket_front[q] ^ bra_front[q] continue if len(rids) > 1: raise NotImplementedError( "allow_state=True currently supports at most one terminal measurement per qubit." ) qcopy = tn.CopyNode(3, 2) qcopy[0] ^ ket_front[q] qcopy[1] ^ bra_front[q] nodes.append(qcopy) record_edges[rids[0]] = qcopy[2] rid_usage_count: Dict[int, int] = {} for det in resolved_detectors: for rid in det: rid_usage_count[rid] = rid_usage_count.get(rid, 0) + 1 fanned_out_edges: Dict[int, List[tn.Edge]] = {} for rid, count in rid_usage_count.items(): redge = record_edges[rid] if redge is None: raise ValueError(f"Missing measurement edge for record {rid}.") if count == 1: fanned_out_edges[rid] = [redge] else: fanout = tn.CopyNode(count + 1, 2) nodes.append(fanout) redge ^ fanout[0] fanned_out_edges[rid] = [fanout[i + 1] for i in range(count)] rid_pop_index = {rid: 0 for rid in rid_usage_count} detector_edges: List[tn.Edge] = [] for det in resolved_detectors: dcopy = tn.CopyNode(len(det) + 1, 2) nodes.append(dcopy) for i, rid in enumerate(det): hnode = tn.Node(hadamard) nodes.append(hnode) edge_to_use = fanned_out_edges[rid][rid_pop_index[rid]] rid_pop_index[rid] += 1 edge_to_use ^ hnode[0] hnode[1] ^ dcopy[i] hout = tn.Node(hadamard) nodes.append(hout) dcopy[len(det)] ^ hout[0] detector_edges.append(hout[1]) for rid, redge in enumerate(record_edges): if rid in rid_usage_count: continue if redge is None: raise ValueError(f"Missing measurement edge for record {rid}.") tnode = tn.Node(tracev) nodes.append(tnode) redge ^ tnode[0] return nodes, detector_edges
[docs] def detector_probabilities(self) -> Tensor: """ Calculate the joint probability distribution of all detectors in the circuit. :return: A tensor representing the joint probability distribution. :rtype: Tensor """ merged_qir = self._merge_qir_with_extra() resolved_detectors = self._resolve_detectors(merged_qir) if len(resolved_detectors) == 0: raise ValueError("No detectors defined in the circuit.") nodes, detector_edges = self._build_detector_tn_from_qir( merged_qir, resolved_detectors ) pt = contractor(nodes, output_edge_order=detector_edges).tensor p = backend.abs(backend.real(pt)) return p / backend.sum(p)
[docs] def outcome_probability(self, state: Sequence[int]) -> Tensor: """ Calculate the probability of a specific detector outcome bitstring. :param state: The detector outcome bitstring as a sequence of 0s and 1s. :type state: Sequence[int] :return: The probability of the given outcome. :rtype: Tensor """ merged_qir = self._merge_qir_with_extra() resolved_detectors = self._resolve_detectors(merged_qir) if len(resolved_detectors) == 0: raise ValueError("No detectors defined in the circuit.") if len(state) != len(resolved_detectors): raise ValueError( f"State length {len(state)} does not match number of detectors {len(resolved_detectors)}." ) nodes, detector_edges = self._build_detector_tn_from_qir( merged_qir, resolved_detectors ) nodes_num, detector_edges_num = self.copy_nodes(nodes, detector_edges) for i, s in enumerate(state): s_val = backend.cast(backend.convert_to_tensor(s), "int32") p_vec = backend.onehot(s_val, 2) p_node = tn.Node(backend.cast(p_vec, dtypestr)) nodes_num.append(p_node) detector_edges_num[i] ^ p_node[0] p_num = contractor(nodes_num).tensor nodes_den, detector_edges_den = self.copy_nodes(nodes, detector_edges) tracev = tn.Node(backend.cast(backend.convert_to_tensor([1.0, 1.0]), dtypestr)) for i in range(len(detector_edges_den)): tnode = tn.Node(tracev.tensor) nodes_den.append(tnode) detector_edges_den[i] ^ tnode[0] p_den = contractor(nodes_den).tensor return backend.abs(backend.real(p_num)) / backend.abs(backend.real(p_den))
def _build_detector_tn_from_qir( self, merged_qir: Sequence[Dict[str, Any]], resolved_detectors: Sequence[Sequence[int]], ) -> Tuple[List[tn.Node], List[tn.Edge]]: if self._can_build_detector_from_existing_nodes(merged_qir): record_qubits = self._record_qubits_terminal_measure_only(merged_qir) return self._build_detector_tn_wht(record_qubits, resolved_detectors) n = self._nqubits work = self._new_detector_work_circuit() work._qir = [] work._extra_qir = [] active = [True] * n record_edges: Dict[int, tn.Edge] = {} measured_so_far = 0 hadamard = np.array([[1.0, 1.0], [1.0, -1.0]], dtype=npdtype) tracev = np.array([1.0, 1.0], dtype=npdtype) zero = np.array([1.0, 0.0], dtype=npdtype) for d in merged_qir: name = str(d.get("name", "")).upper() if not d.get("instruction", False): inds = [int(i) for i in d.get("index", [])] for q in inds: if q < n and not active[q]: raise NotImplementedError( "Gate/noise after measure requires reset_instruction first." ) self._apply_qir(work, [d], allow_channel=True) # type: ignore[arg-type] continue if name in ["MEASURE", "M", "MZ"]: q = int(d["index"][0]) if not active[q]: raise NotImplementedError("Repeated measure on inactive line.") qcopy = tn.CopyNode(3, 2) work._nodes.append(qcopy) qcopy[0] ^ work._front[q] qcopy[1] ^ work._front[q + n] rid = int(d.get("record_index", measured_so_far)) measured_so_far += 1 record_edges[rid] = qcopy[2] active[q] = False continue if name == "MR": q = int(d["index"][0]) if not active[q]: raise NotImplementedError("MR on inactive line.") qcopy = tn.CopyNode(3, 2) work._nodes.append(qcopy) qcopy[0] ^ work._front[q] qcopy[1] ^ work._front[q + n] rid = int(d.get("record_index", measured_so_far)) measured_so_far += 1 record_edges[rid] = qcopy[2] kn = tn.Node(zero) bn = tn.Node(zero) work._nodes.extend([kn, bn]) work._front[q] = kn[0] work._front[q + n] = bn[0] active[q] = True continue if name == "RESET": q = int(d["index"][0]) if active[q]: work._front[q] ^ work._front[q + n] kn = tn.Node(zero) bn = tn.Node(zero) work._nodes.extend([kn, bn]) work._front[q] = kn[0] work._front[q + n] = bn[0] active[q] = True continue if name in ["DETECTOR", "BARRIER", "TICK", "QUBIT_COORDS", "SHIFT_COORDS"]: continue if name in ["DEPOLARIZING", "PAULI"]: q = int(d["index"][0]) if not active[q]: raise NotImplementedError("Noise on inactive line.") params = d.get("parameters", {}) px = float(params.get("px", d.get("px", 0.0))) py = float(params.get("py", d.get("py", 0.0))) pz = float(params.get("pz", d.get("pz", 0.0))) work.depolarizing(q, px=px, py=py, pz=pz) # type: ignore continue raise NotImplementedError( f"Unsupported instruction `{name}` in detector TN." ) for q in range(n): if active[q]: work._front[q] ^ work._front[q + n] nodes = list(work._nodes) rid_usage_count: Dict[int, int] = {} for det in resolved_detectors: for rid in det: rid_usage_count[rid] = rid_usage_count.get(rid, 0) + 1 fanned_out_edges: Dict[int, List[tn.Edge]] = {} for rid, count in rid_usage_count.items(): if rid not in record_edges: raise ValueError( f"Detector references unknown measurement record {rid}." ) redge = record_edges[rid] if count == 1: fanned_out_edges[rid] = [redge] else: fanout = tn.CopyNode(count + 1, 2) nodes.append(fanout) redge ^ fanout[0] fanned_out_edges[rid] = [fanout[i + 1] for i in range(count)] rid_pop_index = {rid: 0 for rid in rid_usage_count} detector_edges: List[tn.Edge] = [] for det in resolved_detectors: dcopy = tn.CopyNode(len(det) + 1, 2) nodes.append(dcopy) for i, rid in enumerate(det): hnode = tn.Node(hadamard) nodes.append(hnode) edge_to_use = fanned_out_edges[rid][rid_pop_index[rid]] rid_pop_index[rid] += 1 edge_to_use ^ hnode[0] hnode[1] ^ dcopy[i] hout = tn.Node(hadamard) nodes.append(hout) dcopy[len(det)] ^ hout[0] detector_edges.append(hout[1]) for rid, redge in record_edges.items(): if rid in rid_usage_count: continue tnode = tn.Node(tracev) nodes.append(tnode) redge ^ tnode[0] return nodes, detector_edges def _new_detector_work_circuit(self) -> "BaseCircuit": return type(self)(**self.circuit_param) def _can_build_detector_from_existing_nodes( self, merged_qir: Sequence[Dict[str, Any]] ) -> bool: for d in merged_qir: if d.get("is_channel", False): return False if not d.get("instruction", False): continue name = str(d.get("name", "")).upper() if name in ["DETECTOR", "BARRIER", "TICK", "QUBIT_COORDS", "SHIFT_COORDS"]: continue if name in ["MEASURE", "M", "MZ"]: if int(d.get("pos", len(self._qir))) < len(self._qir): return False continue return False return True
[docs] def sample_detector( self, shots: int = 1, batch: Optional[int] = None, allow_state: bool = False, status: Optional[Tensor] = None, seed: Optional[int] = None, **kws: Any, ) -> Tensor: """ Sample detector outcomes from instruction-annotated circuits. :param shots: Number of samples to draw, defaults to 1. :type shots: int, optional :param batch: Number of samples to process in a single batch, defaults to None (equal to shots). :type batch: int, optional :param allow_state: If True, uses the full detector probability distribution for sampling (faster but memory-intensive); if False, uses an autoregressive sampling method based on the tensor network, defaults to False. :type allow_state: bool, optional :param status: Random numbers in [0, 1] used for sampling, defaults to None. If allow_state is True, shape should be [shots] or [shots, 1]; if allow_state is False, shape should be [shots, num_detectors]. :type status: Optional[Tensor], optional :param seed: Random seed for sampling, defaults to None. :type seed: Optional[int], optional :return: A boolean tensor containing the sampled detector outcomes with shape [shots, num_detectors]. :rtype: Tensor """ if "batch_size" in kws and "shots" not in kws: shots = int(kws["batch_size"]) if "shots" in kws: shots = int(kws["shots"]) if shots <= 0: raise ValueError("shots must be positive.") if batch is None: batch = shots if batch <= 0: raise ValueError("batch must be positive.") merged_qir = self._merge_qir_with_extra() resolved_detectors = self._resolve_detectors(merged_qir) if len(resolved_detectors) == 0: raise ValueError("No detectors defined in the circuit.") num_detectors = len(resolved_detectors) if allow_state: status_state: Optional[Tensor] = None if status is not None: status_state = backend.cast( backend.convert_to_tensor(status), rdtypestr ) ss = backend.shape_tuple(status_state) if len(ss) == 2: if int(ss[1]) != 1: raise ValueError( "allow_state=True expects status shape [shots] or [shots, 1]." ) status_state = backend.reshape(status_state, [int(ss[0])]) ss = backend.shape_tuple(status_state) if len(ss) != 1 or int(ss[0]) != shots: raise ValueError( "allow_state=True expects status shape [shots] or [shots, 1]." ) else: if seed is not None: g = backend.get_random_state(seed) status_state = backend.cast( backend.stateful_randu(g, shape=[shots]), rdtypestr ) p = backend.reshape(self.detector_probabilities(), [-1]) sample_int = backend.probability_sample(shots, p, status_state) sample_bin = sample_int2bin(sample_int, num_detectors, dim=2) return backend.cast(sample_bin, "bool") status2d: Optional[Tensor] if (not self.is_dm) and (not allow_state): self._validate_detector_trajectory_qir(merged_qir) num_random_events = self._count_detector_random_events(merged_qir) if status is not None: status2d = backend.cast(backend.convert_to_tensor(status), rdtypestr) ss = backend.shape_tuple(status2d) if len(ss) == 1: if shots != 1: raise ValueError("1D status is only valid when shots == 1.") status2d = backend.reshape(status2d, [1, int(ss[0])]) ss = backend.shape_tuple(status2d) if len(ss) != 2 or int(ss[0]) != shots: raise ValueError( "allow_state=False trajectory mode expects status shape [shots, num_random_events]." ) if int(ss[1]) != num_random_events: raise ValueError( f"status second dimension must equal number of random events ({num_random_events})." ) else: if num_random_events == 0: status2d = None elif seed is not None: g = backend.get_random_state(seed) status2d = backend.cast( backend.stateful_randu(g, shape=[shots, num_random_events]), rdtypestr, ) else: status2d = backend.cast( backend.implicit_randu(shape=[shots, num_random_events]), rdtypestr, ) two_i = backend.cast(backend.convert_to_tensor(2), "int32") def trajectory_one(row: Any) -> Tensor: records = self._run_instruction_trajectory(merged_qir, row) det_row: List[Tensor] = [] for rec_indices in resolved_detectors: inds = backend.cast( backend.convert_to_tensor(rec_indices), dtype="int32" ) bits = backend.gather1d(records, inds) parity = backend.mod(backend.sum(bits), two_i) det_row.append(backend.cast(parity, "int32")) return backend.stack(det_row) trajectory_batch = backend.jit(backend.vmap(trajectory_one)) if status2d is None: # num_random_events == 0 case, results are deterministic res = backend.jit(trajectory_one)(None) return backend.cast(backend.tile(res[None, :], [shots, 1]), "bool") shot_rows: List[Tensor] = [] start = 0 while start < shots: stop = min(start + batch, shots) sb = status2d[start:stop] shot_rows.append(trajectory_batch(sb)) start = stop return backend.cast(backend.concat(shot_rows, axis=0), "bool") if status is not None: status2d = backend.cast(backend.convert_to_tensor(status), rdtypestr) ss = backend.shape_tuple(status2d) if len(ss) == 1: if shots != 1: raise ValueError("1D status is only valid when shots == 1.") status2d = backend.reshape(status2d, [1, int(ss[0])]) ss = backend.shape_tuple(status2d) if len(ss) != 2 or int(ss[0]) != shots: raise ValueError( "allow_state=False expects status shape [shots, num_detectors]." ) if int(ss[1]) != num_detectors: raise ValueError( f"status second dimension must equal number of detectors ({num_detectors})." ) else: if seed is not None: g = backend.get_random_state(seed) status2d = backend.cast( backend.stateful_randu(g, shape=[shots, num_detectors]), rdtypestr ) else: status2d = backend.cast( backend.implicit_randu(shape=[shots, num_detectors]), rdtypestr ) base_nodes, base_detector_edges = self._build_detector_tn_from_qir( merged_qir, resolved_detectors ) tracev = backend.cast(backend.convert_to_tensor([1.0, 1.0]), dtypestr) def sample_one(status_row: Tensor) -> Tensor: assigned: List[Tensor] = [] for j in range(num_detectors): nodes, detector_edges = self.copy_nodes(base_nodes, base_detector_edges) for k, bit in enumerate(assigned): pnode = tn.Node(backend.cast(backend.onehot(bit, 2), dtypestr)) nodes.append(pnode) detector_edges[k] ^ pnode[0] for k in range(j + 1, num_detectors): tnode = tn.Node(tracev) nodes.append(tnode) detector_edges[k] ^ tnode[0] pt = contractor(nodes, output_edge_order=[detector_edges[j]]).tensor p2 = backend.abs(backend.real(backend.reshape(pt, [2]))) p2 = p2 / backend.sum(p2) sampled = backend.probability_sample(1, p2, status_row[j : j + 1])[0] assigned.append(backend.cast(sampled, "int32")) return backend.cast(backend.stack(assigned), "bool") sample_one_jit = backend.jit(sample_one) sample_batch_jit = backend.jit(backend.vmap(sample_one_jit)) outs = [] start = 0 while start < shots: stop = min(start + batch, shots) sb = status2d[start:stop] outs.append(sample_batch_jit(sb)) start = stop if len(outs) == 1: return outs[0] return backend.concat(outs, axis=0)
[docs] @partial(arg_alias, alias_dict={"format": ["format_"]}) def sample( self, batch: Optional[int] = None, allow_state: bool = False, readout_error: Optional[Sequence[Any]] = None, format: Optional[str] = None, random_generator: Optional[Any] = None, status: Optional[Tensor] = None, jittable: bool = True, ) -> Any: r""" batched sampling from state or circuit tensor network directly :param batch: number of samples, defaults to None :type batch: Optional[int], optional :param allow_state: if true, we sample from the final state if memory allows, True is preferred, defaults to False :type allow_state: bool, optional :param readout_error: readout_error, defaults to None :type readout_error: Optional[Sequence[Any]]. Tensor, List, Tuple :param format: sample format, defaults to None as backward compatibility check the doc in :py:meth:`tensorcircuit.quantum.measurement_results` Six formats of measurement counts results: "sample_int": # np.array([0, 0]) "sample_bin": # [np.array([1, 0]), np.array([1, 0])] "count_vector": # np.array([2, 0, 0, 0]) "count_tuple": # (np.array([0]), np.array([2])) "count_dict_bin": # {"00": 2, "01": 0, "10": 0, "11": 0} for cases d\in [11, 36], use 0-9A-Z digits (e.g., 'A' -> 10, ..., 'Z' -> 35); "count_dict_int": # {0: 2, 1: 0, 2: 0, 3: 0} :type format: Optional[str] :param random_generator: random generator, defaults to None :type random_generator: Optional[Any], optional :param status: external randomness given by tensor uniformly from [0, 1], if set, can overwrite random_generator, shape [batch] for `allow_state=True` and shape [batch, nqubits] for `allow_state=False` using perfect sampling implementation :type status: Optional[Tensor] :param jittable: when converting to count, whether keep the full size. if false, may be conflict external jit, if true, may fail for large scale system with actual limited count results :type jittable: bool, defaults true :return: List (if batch) of tuple (binary configuration tensor and corresponding probability) if the format is None, and consistent with format when given :rtype: Any """ # TODO(@refraction-ray): to be check: # 1. efficiency in different use case # 2. randomness interaction with jit when no explicit status if not allow_state: if random_generator is None: random_generator = backend.get_random_state() if batch is None: seed = backend.stateful_randu(random_generator, shape=[self._nqubits]) r = self.perfect_sampling(seed) if format is None: # batch=None, format=None, backward compatibility return r r = [r] # type: ignore else: r = [] # type: ignore if status is not None: assert backend.shape_tuple(status)[0] == batch for seed in status: r.append(self.perfect_sampling(seed)) # type: ignore else: @backend.jit def perfect_sampling(key: Any) -> Any: backend.set_random_state(key) return self.perfect_sampling() subkey = random_generator for _ in range(batch): key, subkey = backend.random_split(subkey) r.append(perfect_sampling(key)) # type: ignore if format is None: return r r = backend.stack([ri[0] for ri in r]) # type: ignore ch = backend.cast(r, "int32") # ch = sample_bin2int(r, self._nqubits, dim=self._d) else: # allow_state if batch is None: nbatch = 1 else: nbatch = batch p = self.probability() # readout error if readout_error is not None: p = self.readouterror_bs(readout_error, p) ch = backend.probability_sample(nbatch, p, status, random_generator) # if random_generator is None: # ch = backend.implicit_randc(a=a_range, shape=[nbatch], p=p) # else: # ch = backend.stateful_randc( # random_generator, a=a_range, shape=[nbatch], p=p # ) # confg = backend.mod( # backend.right_shift( # ch[..., None], backend.reverse(backend.arange(self._nqubits)) # ), # 2, # ) if format is None: # for backward compatibility confg = sample_int2bin(ch, self._nqubits, dim=self._d) prob = backend.gather1d(p, ch) r = list(zip(confg, prob)) # type: ignore if batch is None: r = r[0] # type: ignore return r if self._nqubits > 35: jittable = False return sample2all( sample=ch, n=self._nqubits, format=format, jittable=jittable, dim=self._d )
[docs] def sample_expectation_ps( self, x: Optional[Sequence[int]] = None, y: Optional[Sequence[int]] = None, z: Optional[Sequence[int]] = None, shots: Optional[int] = None, random_generator: Optional[Any] = None, status: Optional[Tensor] = None, readout_error: Optional[Sequence[Any]] = None, noise_conf: Optional[Any] = None, nmc: int = 1000, statusc: Optional[Tensor] = None, **kws: Any, ) -> Tensor: """ Compute the expectation with given Pauli string with measurement shots numbers :Example: >>> c = tc.Circuit(2) >>> c.H(0) >>> c.rx(1, theta=np.pi/2) >>> c.sample_expectation_ps(x=[0], y=[1]) -0.99999976 >>> readout_error = [] >>> readout_error.append([0.9,0.75]) >>> readout_error.append([0.4,0.7]) >>> c.sample_expectation_ps(x=[0], y=[1],readout_error = readout_error) >>> c = tc.Circuit(2) >>> c.cnot(0, 1) >>> c.rx(0, theta=0.4) >>> c.rx(1, theta=0.8) >>> c.h(0) >>> c.h(1) >>> error1 = tc.channels.generaldepolarizingchannel(0.1, 1) >>> error2 = tc.channels.generaldepolarizingchannel(0.06, 2) >>> readout_error = [[0.9, 0.75],[0.4, 0.7]] >>> noise_conf = NoiseConf() >>> noise_conf.add_noise("rx", error1) >>> noise_conf.add_noise("cnot", [error2], [[0, 1]]) >>> noise_conf.add_noise("readout", readout_error) >>> c.sample_expectation_ps(x=[0], noise_conf=noise_conf, nmc=10000) 0.44766843 :param x: index for Pauli X, defaults to None :type x: Optional[Sequence[int]], optional :param y: index for Pauli Y, defaults to None :type y: Optional[Sequence[int]], optional :param z: index for Pauli Z, defaults to None :type z: Optional[Sequence[int]], optional :param shots: number of measurement shots, defaults to None, indicating analytical result :type shots: Optional[int], optional :param random_generator: random_generator, defaults to None :type random_generator: Optional[Any] :param status: external randomness given by tensor uniformly from [0, 1], if set, can overwrite random_generator :type status: Optional[Tensor] :param readout_error: readout_error, defaults to None. Overrided if noise_conf is provided. :type readout_error: Optional[Sequence[Any]]. Tensor, List, Tuple :param noise_conf: Noise Configuration, defaults to None :type noise_conf: Optional[NoiseConf], optional :param nmc: repetition time for Monte Carlo sampling for noisfy calculation, defaults to 1000 :type nmc: int, optional :param statusc: external randomness given by tensor uniformly from [0, 1], defaults to None, used for noisfy circuit sampling :type statusc: Optional[Tensor], optional :return: [description] :rtype: Tensor """ from .noisemodel import sample_expectation_ps_noisfy if noise_conf is None: inputs_nodes, _ = self._copy_state_tensor() inputs = inputs_nodes[0].tensor if self.is_dm is False: c = type(self)(self._nqubits, inputs=inputs) # type: ignore else: c = type(self)(self._nqubits, dminputs=inputs) # type: ignore if x is None: x = [] if y is None: y = [] if z is None: z = [] for i in x: c.H(i) # type: ignore for i in y: c.rx(i, theta=np.pi / 2) # type: ignore s = c.state() # type: ignore if self.is_dm is False: p = backend.abs(s) ** 2 else: p = backend.abs(backend.diagonal(s)) # readout error if readout_error is not None: p = self.readouterror_bs(readout_error, p) x = list(x) y = list(y) z = list(z) if shots is None: mc = measurement_counts( p, counts=shots, format="count_vector", random_generator=random_generator, status=status, jittable=True, is_prob=True, ) r = correlation_from_counts(x + y + z, mc) else: mc = measurement_counts( p, counts=shots, format="sample_bin", random_generator=random_generator, status=status, jittable=True, is_prob=True, ) r = correlation_from_samples(x + y + z, mc, self._nqubits) # TODO(@refraction-ray): analytical standard deviation return r else: return sample_expectation_ps_noisfy( c=self, x=x, y=y, z=z, noise_conf=noise_conf, nmc=nmc, shots=shots, statusc=statusc, status=status, **kws, )
sexpps = sample_expectation_ps
[docs] def readouterror_bs( self, readout_error: Optional[Sequence[Any]] = None, p: Optional[Any] = None ) -> Tensor: """Apply readout error to original probabilities of bit string and return the noisy probabilities. :Example: >>> readout_error = [] >>> readout_error.append([0.9,0.75]) # readout error for qubit 0, [p0|0,p1|1] >>> readout_error.append([0.4,0.7]) # readout error for qubit 1, [p0|0,p1|1] :param readout_error: list of readout error for each qubits. :type readout_error: Optional[Sequence[Any]]. Tensor, List, Tuple :param p: probabilities of bit string :type p: Optional[Any] :rtype: Tensor """ # if isinstance(readout_error, tuple): # readout_error = list[readout_error] # type: ignore try: nqubit = int(readout_error.shape[0]) # type: ignore except AttributeError: nqubit = len(readout_error) # type: ignore readoutlist = [] for i in range(nqubit): readoutlist.append( [ [readout_error[i][0], 1 - readout_error[i][1]], # type: ignore [1 - readout_error[i][0], readout_error[i][1]], # type: ignore ] ) readoutlist = backend.cast( backend.convert_to_tensor(readoutlist), dtype=dtypestr ) ms = [Gate(readoutlist[i]) for i in range(nqubit)] p = backend.cast(p, dtypestr) p = Gate(backend.reshape2(p)) for i in range(nqubit): ms[i][1] ^ p[i] nodes = ms + [p] r = contractor(nodes, output_edge_order=[m[0] for m in ms]).tensor p = backend.reshape(r, [-1]) return backend.real(p)
[docs] def replace_inputs(self, inputs: Tensor) -> None: """ Replace the input state with the circuit structure unchanged. :param inputs: Input wavefunction. :type inputs: Tensor """ inputs = backend.reshape(inputs, [-1]) N = inputs.shape[0] n = _infer_num_sites(N, self._d) assert n == self._nqubits inputs = backend.reshape(inputs, [self._d] * n) if self.inputs is not None: self._nodes[0].tensor = inputs if self.is_dm: self._nodes[1].tensor = backend.conj(inputs) else: # TODO(@refraction-ray) replace several start as inputs raise NotImplementedError("not support replace with no inputs")
[docs] def cond_measurement(self, index: int, status: Optional[float] = None) -> Tensor: """ Measurement on z basis at ``index`` qubit based on quantum amplitude (not post-selection). The highlight is that this method can return the measured result as a int Tensor and thus maintained a jittable pipeline. :Example: >>> c = tc.Circuit(2) >>> c.H(0) >>> r = c.cond_measurement(0) >>> c.conditional_gate(r, [tc.gates.i(), tc.gates.x()], 1) >>> c.expectation([tc.gates.z(), [0]]), c.expectation([tc.gates.z(), [1]]) # two possible outputs: (1, 1) or (-1, -1) .. note:: In terms of ``DMCircuit``, this method returns nothing and the density matrix after this method is kept in mixed state without knowing the measuremet resuslts :param index: the site index for the Z-basis measurement :type index: int :return: 0 or 1 for Z-basis measurement outcome :rtype: Tensor """ return self.general_kraus( # type: ignore [np.array([[1.0, 0], [0, 0]]), np.array([[0, 0], [0, 1]])], index, status=status, name="measure", )
cond_measure = cond_measurement
[docs] def to_graphviz( self, graph: graphviz.Graph = None, include_all_names: bool = False, engine: str = "neato", ) -> graphviz.Graph: """ Not an ideal visualization for quantum circuit, but reserve here as a general approach to show the tensornetwork [Deprecated, use ``Circuit.vis_tex`` or ``Circuit.draw`` instead] """ # Modified from tensornetwork codebase nodes = self._nodes if graph is None: # pylint: disable=no-member graph = graphviz.Graph("G", engine=engine) for node in nodes: if not node.name.startswith("__") or include_all_names: label = node.name else: label = "" graph.node(str(id(node)), label=label) seen_edges = set() for node in nodes: for i, edge in enumerate(node.edges): if edge in seen_edges: continue seen_edges.add(edge) if not edge.name.startswith("__") or include_all_names: edge_label = edge.name + ": " + str(edge.dimension) else: edge_label = "" if edge.is_dangling(): # We need to create an invisible node for the dangling edge # to connect to. graph.node( "{}_{}".format(str(id(node)), i), label="", _attributes={"style": "invis"}, ) graph.edge( "{}_{}".format(str(id(node)), i), str(id(node)), label=edge_label, ) else: graph.edge( str(id(edge.node1)), str(id(edge.node2)), label=edge_label, ) return graph
[docs] def get_quvector(self) -> QuVector: """ Get the representation of the output state in the form of ``QuVector`` while maintaining the circuit uncomputed :return: ``QuVector`` representation of the output state from the circuit :rtype: QuVector """ _, edges = self._copy() return QuVector(edges)
quvector = get_quvector
[docs] def projected_subsystem(self, traceout: Tensor, left: Tuple[int, ...]) -> Tensor: """ remaining wavefunction or density matrix on sites in ``left``, with other sites fixed to given digits (0..d-1) as indicated by ``traceout`` :param traceout: can be jitted :type traceout: Tensor :param left: cannot be jitted :type left: Tuple :return: _description_ :rtype: Tensor """ traceout = backend.convert_to_tensor(traceout, dtype=dtypestr) all_endns = onehot_d_tensor(traceout, d=self._d) nodes, front = self._copy() L = self._nqubits edges = [] for i in range(len(traceout)): if i not in left: n = Gate(all_endns[i]) nodes.append(n) front[i] ^ n[0] else: edges.append(front[i]) if self.is_dm: for i in range(len(traceout)): if i not in left: n = Gate(all_endns[i]) nodes.append(n) front[i + L] ^ n[0] else: edges.append(front[i + L]) t = contractor(nodes, output_edge_order=edges) if self.is_dm: rho = backend.reshapem(t.tensor) return rho / (backend.trace(rho) + 1e-10) return backend.reshape(t.tensor / backend.norm(t.tensor), [-1])