Quellcode für qiskit.transpiler.passes.optimization.hoare_opt

# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2019.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Pass for Hoare logic circuit optimization."""
from qiskit.transpiler.basepasses import TransformationPass
from qiskit.circuit import QuantumRegister, ControlledGate, Gate
from qiskit.dagcircuit import DAGCircuit
from qiskit.extensions.unitary import UnitaryGate
from qiskit.quantum_info.operators.predicates import matrix_equal
from qiskit.circuit.exceptions import CircuitError
from qiskit.circuit.library.standard_gates import CZGate, CU1Gate, MCU1Gate
from qiskit.utils import optionals as _optionals


[Doku]@_optionals.HAS_Z3.require_in_instance class HoareOptimizer(TransformationPass): """This is a transpiler pass using Hoare logic circuit optimization. The inner workings of this are detailed in: https://arxiv.org/abs/1810.00375 """ def __init__(self, size=10): """ Args: size (int): size of gate cache, in number of gates Raises: MissingOptionalLibraryError: if unable to import z3 solver """ # This module is just a script that adds several post conditions onto existing classes. from . import _gate_extension super().__init__() self.solver = None self.variables = None self.gatenum = None self.gatecache = None self.varnum = None self.size = size def _gen_variable(self, qubit): """After each gate generate a new unique variable name for each of the qubits, using scheme: 'q[id]_[gatenum]', e.g. q1_0 -> q1_1 -> q1_2, q2_0 -> q2_1 Args: qubit (Qubit): qubit to generate new variable for Returns: BoolRef: z3 variable of qubit state """ import z3 varname = "q" + str(qubit) + "_" + str(self.gatenum[qubit]) var = z3.Bool(varname) self.gatenum[qubit] += 1 self.variables[qubit].append(var) return var def _initialize(self, dag): """create boolean variables for each qubit and apply qb == 0 condition Args: dag (DAGCircuit): input DAG to get qubits from """ import z3 for qbt in dag.qubits: self.gatenum[qbt] = 0 self.variables[qbt] = [] self.gatecache[qbt] = [] self.varnum[qbt] = {} x = self._gen_variable(qbt) self.solver.add(z3.Not(x)) def _add_postconditions(self, gate, ctrl_ones, trgtqb, trgtvar): """create boolean variables for each qubit the gate is applied to and apply the relevant post conditions. a gate rotating out of the z-basis will not have any valid post-conditions, in which case the qubit state is unknown Args: gate (Gate): gate to inspect ctrl_ones (BoolRef): z3 condition asserting all control qubits to 1 trgtqb (list((QuantumRegister, int))): list of target qubits trgtvar (list(BoolRef)): z3 variables corresponding to latest state of target qubits """ import z3 new_vars = [] for qbt in trgtqb: new_vars.append(self._gen_variable(qbt)) try: self.solver.add(z3.Implies(ctrl_ones, gate._postconditions(*(trgtvar + new_vars)))) except AttributeError: pass for i, tvar in enumerate(trgtvar): self.solver.add(z3.Implies(z3.Not(ctrl_ones), new_vars[i] == tvar)) def _test_gate(self, gate, ctrl_ones, trgtvar): """use z3 sat solver to determine triviality of gate Args: gate (Gate): gate to inspect ctrl_ones (BoolRef): z3 condition asserting all control qubits to 1 trgtvar (list(BoolRef)): z3 variables corresponding to latest state of target qubits Returns: bool: if gate is trivial """ import z3 trivial = False self.solver.push() try: triv_cond = gate._trivial_if(*trgtvar) except AttributeError: self.solver.add(ctrl_ones) trivial = self.solver.check() == z3.unsat else: if isinstance(triv_cond, bool): if triv_cond and len(trgtvar) == 1: self.solver.add(z3.Not(z3.And(ctrl_ones, trgtvar[0]))) sol1 = self.solver.check() == z3.unsat self.solver.pop() self.solver.push() self.solver.add(z3.And(ctrl_ones, trgtvar[0])) sol2 = self.solver.check() == z3.unsat trivial = sol1 or sol2 else: self.solver.add(z3.And(ctrl_ones, z3.Not(triv_cond))) trivial = self.solver.check() == z3.unsat self.solver.pop() return trivial def _remove_control(self, gate, ctrlvar, trgtvar): """use z3 sat solver to determine if all control qubits are in 1 state, and if so replace the Controlled - U by U. Args: gate (Gate): gate to inspect ctrlvar (list(BoolRef)): z3 variables corresponding to latest state of control qubits trgtvar (list(BoolRef)): z3 variables corresponding to latest state of target qubits Returns: Tuple(bool, DAGCircuit, List):: * bool:if controlled gate can be replaced. * DAGCircuit: with U applied to the target qubits. * List: with indices of target qubits. """ remove = False qarg = QuantumRegister(gate.num_qubits) dag = DAGCircuit() dag.add_qreg(qarg) qb = list(range(len(ctrlvar), gate.num_qubits)) # last qubits as target. if isinstance(gate, ControlledGate): remove = self._check_removal(ctrlvar) # try with other qubit as 'target'. if isinstance(gate, (CZGate, CU1Gate, MCU1Gate)): while not remove and qb[0] > 0: qb[0] = qb[0] - 1 ctrl_vars = ctrlvar[: qb[0]] + ctrlvar[qb[0] + 1 :] + trgtvar remove = self._check_removal(ctrl_vars) if remove: qubits = [qarg[qi] for qi in qb] dag.apply_operation_back(gate.base_gate, qubits) return remove, dag, qb def _check_removal(self, ctrlvar): import z3 ctrl_ones = z3.And(*ctrlvar) self.solver.push() self.solver.add(z3.Not(ctrl_ones)) remove = self.solver.check() == z3.unsat self.solver.pop() return remove def _traverse_dag(self, dag): """traverse DAG in topological order for each gate check: if any control is 0, or if triviality conditions are satisfied if yes remove gate from dag apply postconditions of gate Args: dag (DAGCircuit): input DAG to optimize in place """ import z3 for node in dag.topological_op_nodes(): gate = node.op ctrlqb, ctrlvar, trgtqb, trgtvar = self._seperate_ctrl_trgt(node) ctrl_ones = z3.And(*ctrlvar) remove_ctrl, new_dag, qb_idx = self._remove_control(gate, ctrlvar, trgtvar) if remove_ctrl: dag.substitute_node_with_dag(node, new_dag) gate = gate.base_gate node.op = gate node.name = gate.name node.qargs = tuple((ctrlqb + trgtqb)[qi] for qi in qb_idx) _, ctrlvar, trgtqb, trgtvar = self._seperate_ctrl_trgt(node) ctrl_ones = z3.And(*ctrlvar) trivial = self._test_gate(gate, ctrl_ones, trgtvar) if trivial: dag.remove_op_node(node) elif self.size > 1: for qbt in node.qargs: self.gatecache[qbt].append(node) self.varnum[qbt][node] = self.gatenum[qbt] - 1 for qbt in node.qargs: if len(self.gatecache[qbt]) >= self.size: self._multigate_opt(dag, qbt) self._add_postconditions(gate, ctrl_ones, trgtqb, trgtvar) def _remove_successive_identity(self, dag, qubit, from_idx=None): """remove gates that have the same set of target qubits, follow each other immediately on these target qubits, and combine to the identity (consider sequences of length 2 for now) Args: dag (DAGCircuit): the directed acyclic graph to run on. qubit (Qubit): qubit cache to inspect from_idx (int): only gates whose indexes in the cache are larger than this value can be removed """ i = 0 while i < len(self.gatecache[qubit]) - 1: append = True node1 = self.gatecache[qubit][i] node2 = self.gatecache[qubit][i + 1] trgtqb1 = self._seperate_ctrl_trgt(node1)[2] trgtqb2 = self._seperate_ctrl_trgt(node2)[2] i += 1 if trgtqb1 != trgtqb2: continue try: for qbt in trgtqb1: idx = self.gatecache[qbt].index(node1) if self.gatecache[qbt][idx + 1] is not node2: append = False except (IndexError, ValueError): continue seq = [node1, node2] if append and self._is_identity(seq) and self._seq_as_one(seq): i += 1 for node in seq: dag.remove_op_node(node) if from_idx is None or self.gatecache[qubit].index(node) > from_idx: for qbt in node.qargs: self.gatecache[qbt].remove(node) def _is_identity(self, sequence): """determine whether the sequence of gates combines to the identity (consider sequences of length 2 for now) Args: sequence (list(DAGOpNode)): gate sequence to inspect Returns: bool: if gate sequence combines to identity """ assert len(sequence) == 2 # some Instructions (e.g measurements) may not have an inverse. try: gate1, gate2 = sequence[0].op, sequence[1].op.inverse() except CircuitError: return False par1, par2 = gate1.params, gate2.params def1, def2 = gate1.definition, gate2.definition if isinstance(gate1, ControlledGate): gate1 = gate1.base_gate gate1 = type(gate1) if isinstance(gate2, ControlledGate): gate2 = gate2.base_gate gate2 = type(gate2) # equality of gates can be determined via type and parameters, unless # the gates have no specific type, in which case definition is used # or they are unitary gates, in which case matrix equality is used if gate1 is Gate and gate2 is Gate: return def1 == def2 and def1 and def2 elif gate1 is UnitaryGate and gate2 is UnitaryGate: return matrix_equal(par1[0], par2[0], ignore_phase=True) return gate1 == gate2 and par1 == par2 def _seq_as_one(self, sequence): """use z3 solver to determine if the gates in the sequence are either all executed or none of them are executed, based on control qubits (consider sequences of length 2 for now) Args: sequence (list(DAGOpNode)): gate sequence to inspect Returns: bool: if gate sequence is only executed completely or not at all """ from z3 import Or, And, Not import z3 assert len(sequence) == 2 ctrlvar1 = self._seperate_ctrl_trgt(sequence[0])[1] ctrlvar2 = self._seperate_ctrl_trgt(sequence[1])[1] self.solver.push() self.solver.add( Or(And(And(*ctrlvar1), Not(And(*ctrlvar2))), And(Not(And(*ctrlvar1)), And(*ctrlvar2))) ) res = self.solver.check() == z3.unsat self.solver.pop() return res def _multigate_opt(self, dag, qubit, max_idx=None, dnt_rec=None): """ Args: dag (DAGCircuit): the directed acyclic graph to run on. qubit (Qubit): qubit whose gate cache is to be optimized max_idx (int): a value indicates a recursive call, optimize and remove gates up to this point in the cache dnt_rec (list(int)): don't recurse on these qubit caches (again) """ if not self.gatecache[qubit]: return # try to optimize this qubit's pipeline self._remove_successive_identity(dag, qubit, max_idx) if len(self.gatecache[qubit]) < self.size and max_idx is None: # unless in a rec call, we are done if the cache isn't full return elif max_idx is None: # need to remove at least one gate from cache, so remove oldest max_idx = 0 dnt_rec = set() dnt_rec.add(qubit) gates_tbr = [self.gatecache[qubit][0]] else: # need to remove all gates up to max_idx (in reverse order) gates_tbr = self.gatecache[qubit][max_idx::-1] for node in gates_tbr: # for rec call, only look at qubits that haven't been optimized yet new_qb = [x for x in node.qargs if x not in dnt_rec] dnt_rec.update(new_qb) for qbt in new_qb: idx = self.gatecache[qbt].index(node) # recursive chain to optimize all gates in this qubit's cache self._multigate_opt(dag, qbt, max_idx=idx, dnt_rec=dnt_rec) # truncate gatecache for this qubit to after above gate self.gatecache[qubit] = self.gatecache[qubit][max_idx + 1 :] def _seperate_ctrl_trgt(self, node): """Get the target qubits and control qubits if available, as well as their respective z3 variables. """ gate = node.op if isinstance(gate, ControlledGate): numctrl = gate.num_ctrl_qubits else: numctrl = 0 ctrlqb = node.qargs[:numctrl] trgtqb = node.qargs[numctrl:] try: ctrlvar = [self.variables[qb][self.varnum[qb][node]] for qb in ctrlqb] trgtvar = [self.variables[qb][self.varnum[qb][node]] for qb in trgtqb] except KeyError: ctrlvar = [self.variables[qb][-1] for qb in ctrlqb] trgtvar = [self.variables[qb][-1] for qb in trgtqb] return (ctrlqb, ctrlvar, trgtqb, trgtvar) def _reset(self): """Reset HoareOptimize internal state, so it can be run multiple times. """ import z3 self.solver = z3.Solver() self.variables = {} self.gatenum = {} self.gatecache = {} self.varnum = {}
[Doku] def run(self, dag): """ Args: dag (DAGCircuit): the directed acyclic graph to run on. Returns: DAGCircuit: Transformed DAG. """ self._reset() self._initialize(dag) self._traverse_dag(dag) if self.size > 1: for qbt in dag.qubits: self._multigate_opt(dag, qbt) return dag