# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Various ways to divide a DAG into blocks of nodes, to split blocks of nodes
into smaller sub-blocks, and to consolidate blocks."""
from qiskit.circuit import QuantumCircuit, CircuitInstruction, ClassicalRegister
from qiskit.circuit.controlflow import condition_resources
from .dagnode import DAGOpNode
from .dagcircuit import DAGCircuit
from .dagdependency import DAGDependency
from .exceptions import DAGCircuitError
[ドキュメント]class BlockCollector:
"""This class implements various strategies of dividing a DAG (direct acyclic graph)
into blocks of nodes that satisfy certain criteria. It works both with the
:class:`~qiskit.dagcircuit.DAGCircuit` and
:class:`~qiskit.dagcircuit.DAGDependency` representations of a DAG, where
DagDependency takes into account commutativity between nodes.
Collecting nodes from DAGDependency generally leads to more optimal results, but is
slower, as it requires to construct a DAGDependency beforehand. Thus, DAGCircuit should
be used with lower transpiler settings, and DAGDependency should be used with higher
transpiler settings.
In general, there are multiple ways to collect maximal blocks. The approaches used
here are of the form 'starting from the input nodes of a DAG, greedily collect
the largest block of nodes that match certain criteria'. For additional details,
see https://github.com/Qiskit/qiskit-terra/issues/5775.
"""
def __init__(self, dag):
"""
Args:
dag (Union[DAGCircuit, DAGDependency]): The input DAG.
Raises:
DAGCircuitError: the input object is not a DAG.
"""
self.dag = dag
self._pending_nodes = None
self._in_degree = None
self._collect_from_back = False
if isinstance(dag, DAGCircuit):
self.is_dag_dependency = False
elif isinstance(dag, DAGDependency):
self.is_dag_dependency = True
else:
raise DAGCircuitError("not a DAG.")
def _setup_in_degrees(self):
"""For an efficient implementation, for every node we keep the number of its
unprocessed immediate predecessors (called ``_in_degree``). This ``_in_degree``
is set up at the start and updated throughout the algorithm.
A node is leaf (or input) node iff its ``_in_degree`` is 0.
When a node is (marked as) collected, the ``_in_degree`` of each of its immediate
successor is updated by subtracting 1.
Additionally, ``_pending_nodes`` explicitly keeps the list of nodes whose
``_in_degree`` is 0.
"""
self._pending_nodes = []
self._in_degree = {}
for node in self._op_nodes():
deg = len(self._direct_preds(node))
self._in_degree[node] = deg
if deg == 0:
self._pending_nodes.append(node)
def _op_nodes(self):
"""Returns DAG nodes."""
if not self.is_dag_dependency:
return self.dag.op_nodes()
else:
return self.dag.get_nodes()
def _direct_preds(self, node):
"""Returns direct predecessors of a node. This function takes into account the
direction of collecting blocks, that is node's predecessors when collecting
backwards are the direct successors of a node in the DAG.
"""
if not self.is_dag_dependency:
if self._collect_from_back:
return [pred for pred in self.dag.successors(node) if isinstance(pred, DAGOpNode)]
else:
return [pred for pred in self.dag.predecessors(node) if isinstance(pred, DAGOpNode)]
else:
if self._collect_from_back:
return [
self.dag.get_node(pred_id)
for pred_id in self.dag.direct_successors(node.node_id)
]
else:
return [
self.dag.get_node(pred_id)
for pred_id in self.dag.direct_predecessors(node.node_id)
]
def _direct_succs(self, node):
"""Returns direct successors of a node. This function takes into account the
direction of collecting blocks, that is node's successors when collecting
backwards are the direct predecessors of a node in the DAG.
"""
if not self.is_dag_dependency:
if self._collect_from_back:
return [succ for succ in self.dag.predecessors(node) if isinstance(succ, DAGOpNode)]
else:
return [succ for succ in self.dag.successors(node) if isinstance(succ, DAGOpNode)]
else:
if self._collect_from_back:
return [
self.dag.get_node(succ_id)
for succ_id in self.dag.direct_predecessors(node.node_id)
]
else:
return [
self.dag.get_node(succ_id)
for succ_id in self.dag.direct_successors(node.node_id)
]
def _have_uncollected_nodes(self):
"""Returns whether there are uncollected (pending) nodes"""
return len(self._pending_nodes) > 0
[ドキュメント] def collect_matching_block(self, filter_fn):
"""Iteratively collects the largest block of input nodes (that is, nodes with
``_in_degree`` equal to 0) that match a given filtering function.
Examples of this include collecting blocks of swap gates,
blocks of linear gates (CXs and SWAPs), blocks of Clifford gates, blocks of single-qubit gates,
blocks of two-qubit gates, etc. Here 'iteratively' means that once a node is collected,
the ``_in_degree`` of each of its immediate successor is decreased by 1, allowing more nodes
to become input and to be eligible for collecting into the current block.
Returns the block of collected nodes.
"""
current_block = []
unprocessed_pending_nodes = self._pending_nodes
self._pending_nodes = []
# Iteratively process unprocessed_pending_nodes:
# - any node that does not match filter_fn is added to pending_nodes
# - any node that match filter_fn is added to the current_block,
# and some of its successors may be moved to unprocessed_pending_nodes.
while unprocessed_pending_nodes:
new_pending_nodes = []
for node in unprocessed_pending_nodes:
if filter_fn(node):
current_block.append(node)
# update the _in_degree of node's successors
for suc in self._direct_succs(node):
self._in_degree[suc] -= 1
if self._in_degree[suc] == 0:
new_pending_nodes.append(suc)
else:
self._pending_nodes.append(node)
unprocessed_pending_nodes = new_pending_nodes
return current_block
[ドキュメント] def collect_all_matching_blocks(
self,
filter_fn,
split_blocks=True,
min_block_size=2,
split_layers=False,
collect_from_back=False,
):
"""Collects all blocks that match a given filtering function filter_fn.
This iteratively finds the largest block that does not match filter_fn,
then the largest block that matches filter_fn, and so on, until no more uncollected
nodes remain. Intuitively, finding larger blocks of non-matching nodes helps to
find larger blocks of matching nodes later on.
After the blocks are collected, they can be optionally refined. The option
``split_blocks`` allows to split collected blocks into sub-blocks over disjoint
qubit subsets. The option ``split_layers`` allows to split collected blocks
into layers of non-overlapping instructions. The option ``min_block_size``
specifies the minimum number of gates in the block for the block to be collected.
By default, blocks are collected in the direction from the inputs towards the outputs
of the circuit. The option ``collect_from_back`` allows to change this direction,
that is collect blocks from the outputs towards the inputs of the circuit.
Returns the list of matching blocks only.
"""
def not_filter_fn(node):
"""Returns the opposite of filter_fn."""
return not filter_fn(node)
# Note: the collection direction must be specified before setting in-degrees
self._collect_from_back = collect_from_back
self._setup_in_degrees()
# Iteratively collect non-matching and matching blocks.
matching_blocks = []
while self._have_uncollected_nodes():
self.collect_matching_block(not_filter_fn)
matching_block = self.collect_matching_block(filter_fn)
if matching_block:
matching_blocks.append(matching_block)
# If the option split_layers is set, refine blocks by splitting them into layers
# of non-overlapping instructions (in other words, into depth-1 sub-blocks).
if split_layers:
tmp_blocks = []
for block in matching_blocks:
tmp_blocks.extend(split_block_into_layers(block))
matching_blocks = tmp_blocks
# If the option split_blocks is set, refine blocks by splitting them into sub-blocks over
# disconnected qubit subsets.
if split_blocks:
tmp_blocks = []
for block in matching_blocks:
tmp_blocks.extend(BlockSplitter().run(block))
matching_blocks = tmp_blocks
# If we are collecting from the back, both the order of the blocks
# and the order of nodes in each block should be reversed.
if self._collect_from_back:
matching_blocks = [block[::-1] for block in matching_blocks[::-1]]
# Keep only blocks with at least min_block_sizes.
matching_blocks = [block for block in matching_blocks if len(block) >= min_block_size]
return matching_blocks
[ドキュメント]class BlockSplitter:
"""Splits a block of nodes into sub-blocks over disjoint qubits.
The implementation is based on the Disjoint Set Union data structure."""
def __init__(self):
self.leader = {} # qubit's group leader
self.group = {} # qubit's group
[ドキュメント] def find_leader(self, index):
"""Find in DSU."""
if index not in self.leader:
self.leader[index] = index
self.group[index] = []
return index
if self.leader[index] == index:
return index
self.leader[index] = self.find_leader(self.leader[index])
return self.leader[index]
[ドキュメント] def union_leaders(self, index1, index2):
"""Union in DSU."""
leader1 = self.find_leader(index1)
leader2 = self.find_leader(index2)
if leader1 == leader2:
return
if len(self.group[leader1]) < len(self.group[leader2]):
leader1, leader2 = leader2, leader1
self.leader[leader2] = leader1
self.group[leader1].extend(self.group[leader2])
self.group[leader2].clear()
[ドキュメント] def run(self, block):
"""Splits block of nodes into sub-blocks over disjoint qubits."""
for node in block:
indices = node.qargs
if not indices:
continue
first = indices[0]
for index in indices[1:]:
self.union_leaders(first, index)
self.group[self.find_leader(first)].append(node)
blocks = []
for index in self.leader:
if self.leader[index] == index:
blocks.append(self.group[index])
return blocks
def split_block_into_layers(block):
"""Splits a block of nodes into sub-blocks of non-overlapping instructions
(or, in other words, into depth-1 sub-blocks).
"""
bit_depths = {}
layers = []
for node in block:
cur_bits = set(node.qargs)
cur_bits.update(node.cargs)
cond = getattr(node.op, "condition", None)
if cond is not None:
cur_bits.update(condition_resources(cond).clbits)
cur_depth = max(bit_depths.get(bit, 0) for bit in cur_bits)
while len(layers) <= cur_depth:
layers.append([])
for bit in cur_bits:
bit_depths[bit] = cur_depth + 1
layers[cur_depth].append(node)
return layers
[ドキュメント]class BlockCollapser:
"""This class implements various strategies of consolidating blocks of nodes
in a DAG (direct acyclic graph). It works both with
the :class:`~qiskit.dagcircuit.DAGCircuit`
and :class:`~qiskit.dagcircuit.DAGDependency` DAG representations.
"""
def __init__(self, dag):
"""
Args:
dag (Union[DAGCircuit, DAGDependency]): The input DAG.
Raises:
DAGCircuitError: the input object is not a DAG.
"""
self.dag = dag
[ドキュメント] def collapse_to_operation(self, blocks, collapse_fn):
"""For each block, constructs a quantum circuit containing instructions in the block,
then uses collapse_fn to collapse this circuit into a single operation.
"""
global_index_map = {wire: idx for idx, wire in enumerate(self.dag.qubits)}
global_index_map.update({wire: idx for idx, wire in enumerate(self.dag.clbits)})
for block in blocks:
# Find the sets of qubits/clbits used in this block (which might be much smaller
# than the set of all qubits/clbits).
cur_qubits = set()
cur_clbits = set()
# Additionally, find the set of classical registers used in conditions over full registers
# (in such a case, we need to add that register to the block circuit, not just its clbits).
cur_clregs = []
for node in block:
cur_qubits.update(node.qargs)
cur_clbits.update(node.cargs)
cond = getattr(node.op, "condition", None)
if cond is not None:
cur_clbits.update(condition_resources(cond).clbits)
if isinstance(cond[0], ClassicalRegister):
cur_clregs.append(cond[0])
# For reproducibility, order these qubits/clbits compatibly with the global order.
sorted_qubits = sorted(cur_qubits, key=lambda x: global_index_map[x])
sorted_clbits = sorted(cur_clbits, key=lambda x: global_index_map[x])
qc = QuantumCircuit(sorted_qubits, sorted_clbits)
# Add classical registers used in conditions over registers
for reg in cur_clregs:
qc.add_register(reg)
# Construct a quantum circuit from the nodes in the block, remapping the qubits.
wire_pos_map = {qb: ix for ix, qb in enumerate(sorted_qubits)}
wire_pos_map.update({qb: ix for ix, qb in enumerate(sorted_clbits)})
for node in block:
instructions = qc.append(CircuitInstruction(node.op, node.qargs, node.cargs))
cond = getattr(node.op, "condition", None)
if cond is not None:
instructions.c_if(*cond)
# Collapse this quantum circuit into an operation.
op = collapse_fn(qc)
# Replace the block of nodes in the DAG by the constructed operation
# (the function replace_block_with_op is implemented both in DAGCircuit and DAGDependency).
self.dag.replace_block_with_op(block, op, wire_pos_map, cycle_check=False)
return self.dag