Source code for qiskit.aqua.algorithms.classifiers.vqc

# -*- coding: utf-8 -*-

# This code is part of Qiskit.
#
# (C) Copyright IBM 2018, 2020.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""The Variational Quantum Classifier algorithm."""

from typing import Optional, Callable, Dict, Union
import warnings
import logging
import math
import numpy as np

from sklearn.utils import shuffle
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
from qiskit.circuit import ParameterVector, ParameterExpression

from qiskit.providers import BaseBackend
from qiskit.aqua import QuantumInstance, AquaError
from qiskit.aqua.utils import map_label_to_class_name
from qiskit.aqua.utils import split_dataset_to_data_and_labels
from qiskit.aqua.algorithms import VQAlgorithm
from qiskit.aqua.components.optimizers import Optimizer
from qiskit.aqua.components.feature_maps import FeatureMap, RawFeatureVector
from qiskit.aqua.components.variational_forms import VariationalForm

logger = logging.getLogger(__name__)

# pylint: disable=invalid-name


[docs]class VQC(VQAlgorithm): """The Variational Quantum Classifier algorithm. Similar to :class:`QSVM`, the VQC algorithm also applies to classification problems. VQC uses the variational method to solve such problems in a quantum processor. Specifically, it optimizes a parameterized quantum circuit to provide a solution that cleanly separates the data. .. note:: The VQC stores the parameters of `var_form` and `feature_map` sorted by name to map the values provided by the optimizer to the circuit. This is done to ensure reproducible results, for example such that running the optimization twice with same random seeds yields the same result. """ def __init__( self, optimizer: Optimizer, feature_map: Union[QuantumCircuit, FeatureMap], var_form: Union[QuantumCircuit, VariationalForm], training_dataset: Dict[str, np.ndarray], test_dataset: Optional[Dict[str, np.ndarray]] = None, datapoints: Optional[np.ndarray] = None, max_evals_grouped: int = 1, minibatch_size: int = -1, callback: Optional[Callable[[int, np.ndarray, float, int], None]] = None, quantum_instance: Optional[Union[QuantumInstance, BaseBackend]] = None) -> None: """ Args: optimizer: The classical optimizer to use. feature_map: The FeatureMap instance to use. var_form: The variational form instance. training_dataset: The training dataset, in the format {'A': np.ndarray, 'B': np.ndarray, ...}. test_dataset: The test dataset, in same format as `training_dataset`. datapoints: NxD array, N is the number of data and D is data dimension. max_evals_grouped: The maximum number of evaluations to perform simultaneously. minibatch_size: The size of a mini-batch. callback: a callback that can access the intermediate data during the optimization. Four parameter values are passed to the callback as follows during each evaluation. These are: the evaluation count, parameters of the variational form, the evaluated value, the index of data batch. quantum_instance: Quantum Instance or Backend Note: We use `label` to denotes numeric results and `class` the class names (str). Raises: AquaError: Missing feature map or missing training dataset. """ # VariationalForm is not deprecated on level of the VQAlgorithm yet as UCCSD still # derives from there, therefore we're adding a warning here if isinstance(var_form, VariationalForm): warnings.warn(""" The {} object as input for the VQC is deprecated as of 0.7.0 and will be removed no earlier than 3 months after the release. You should pass a QuantumCircuit object instead. See also qiskit.circuit.library.n_local for a collection of suitable circuits.""".format(type(feature_map)), DeprecationWarning, stacklevel=2) super().__init__( var_form=var_form, optimizer=optimizer, cost_fn=self._cost_function_wrapper, quantum_instance=quantum_instance ) self._batches = None self._label_batches = None self._batch_index = None self._eval_time = None self.batch_num = None self._optimizer.set_max_evals_grouped(max_evals_grouped) self._callback = callback if feature_map is None: raise AquaError('Missing feature map.') if training_dataset is None: raise AquaError('Missing training dataset.') self._training_dataset, self._class_to_label = split_dataset_to_data_and_labels( training_dataset) self._label_to_class = {label: class_name for class_name, label in self._class_to_label.items()} self._num_classes = len(list(self._class_to_label.keys())) if test_dataset is not None: self._test_dataset = split_dataset_to_data_and_labels(test_dataset, self._class_to_label) else: self._test_dataset = test_dataset if datapoints is not None and not isinstance(datapoints, np.ndarray): datapoints = np.asarray(datapoints) if len(datapoints) == 0: # pylint: disable=len-as-condition datapoints = None self._datapoints = datapoints self._minibatch_size = minibatch_size self._eval_count = 0 self._ret = {} self._parameterized_circuits = None self.feature_map = feature_map
[docs] def construct_circuit(self, x, theta, measurement=False): """Construct circuit based on data and parameters in variational form. Args: x (numpy.ndarray): 1-D array with D dimension theta (list[numpy.ndarray]): list of 1-D array, parameters sets for variational form measurement (bool): flag to add measurement Returns: QuantumCircuit: the circuit Raises: AquaError: If ``x`` and ``theta`` share parameters with the same name. """ # check x and theta do not have parameters of the same name x_names = [param.name for param in x if isinstance(param, ParameterExpression)] theta_names = [param.name for param in theta if isinstance(param, ParameterExpression)] if any(x_name in theta_names for x_name in x_names): raise AquaError('Variational form and feature map are not allowed to share parameters ' 'with the same name!') qr = QuantumRegister(self._num_qubits, name='q') cr = ClassicalRegister(self._num_qubits, name='c') qc = QuantumCircuit(qr, cr) if isinstance(self.feature_map, QuantumCircuit): param_dict = dict(zip(self._feature_map_params, x)) circuit = self._feature_map.assign_parameters(param_dict, inplace=False) qc.append(circuit.to_instruction(), qr) else: qc += self._feature_map.construct_circuit(x, qr) if isinstance(self.var_form, QuantumCircuit): param_dict = dict(zip(self._var_form_params, theta)) circuit = self._var_form.assign_parameters(param_dict, inplace=False) qc.append(circuit.to_instruction(), qr) else: qc += self._var_form.construct_circuit(theta, qr) if measurement: qc.barrier(qr) qc.measure(qr, cr) return qc
def _get_prediction(self, data, theta): """Make prediction on data based on each theta. Args: data (numpy.ndarray): 2-D array, NxD, N data points, each with D dimension theta (list[numpy.ndarray]): list of 1-D array, parameters sets for variational form Returns: Union(numpy.ndarray or [numpy.ndarray], numpy.ndarray or [numpy.ndarray]): list of NxK array, list of Nx1 array """ circuits = [] num_theta_sets = len(theta) // self._var_form.num_parameters theta_sets = np.split(theta, num_theta_sets) def _build_parameterized_circuits(): var_form_support = isinstance(self._var_form, QuantumCircuit) \ or self._var_form.support_parameterized_circuit feat_map_support = isinstance(self._feature_map, QuantumCircuit) \ or self._feature_map.support_parameterized_circuit if var_form_support and feat_map_support and self._parameterized_circuits is None: parameterized_circuits = self.construct_circuit( self._feature_map_params, self._var_form_params, measurement=not self._quantum_instance.is_statevector) self._parameterized_circuits = \ self._quantum_instance.transpile(parameterized_circuits)[0] _build_parameterized_circuits() for thet in theta_sets: for datum in data: if self._parameterized_circuits is not None: curr_params = dict(zip(self._feature_map_params, datum)) curr_params.update(dict(zip(self._var_form_params, thet))) circuit = self._parameterized_circuits.assign_parameters(curr_params) else: circuit = self.construct_circuit( datum, thet, measurement=not self._quantum_instance.is_statevector) circuits.append(circuit) results = self._quantum_instance.execute( circuits, had_transpiled=self._parameterized_circuits is not None) circuit_id = 0 predicted_probs = [] predicted_labels = [] for _ in theta_sets: counts = [] for _ in data: if self._quantum_instance.is_statevector: temp = results.get_statevector(circuit_id) outcome_vector = (temp * temp.conj()).real # convert outcome_vector to outcome_dict, where key # is a basis state and value is the count. # Note: the count can be scaled linearly, i.e., # it does not have to be an integer. outcome_dict = {} bitstr_size = int(math.log2(len(outcome_vector))) for i, _ in enumerate(outcome_vector): bitstr_i = format(i, '0' + str(bitstr_size) + 'b') outcome_dict[bitstr_i] = outcome_vector[i] else: outcome_dict = results.get_counts(circuit_id) counts.append(outcome_dict) circuit_id += 1 probs = return_probabilities(counts, self._num_classes) predicted_probs.append(probs) predicted_labels.append(np.argmax(probs, axis=1)) if len(predicted_probs) == 1: predicted_probs = predicted_probs[0] if len(predicted_labels) == 1: predicted_labels = predicted_labels[0] return predicted_probs, predicted_labels # Breaks data into minibatches. Labels are optional, # but will be broken into batches if included.
[docs] def batch_data(self, data, labels=None, minibatch_size=-1): """ batch data """ label_batches = None if 0 < minibatch_size < len(data): batch_size = min(minibatch_size, len(data)) if labels is not None: shuffled_samples, shuffled_labels = shuffle(data, labels, random_state=self.random) label_batches = np.array_split(shuffled_labels, batch_size) else: shuffled_samples = shuffle(data, random_state=self.random) batches = np.array_split(shuffled_samples, batch_size) else: batches = np.asarray([data]) label_batches = np.asarray([labels]) return batches, label_batches
[docs] def is_gradient_really_supported(self): """ returns is gradient really supported """ return self.optimizer.is_gradient_supported and not self.optimizer.is_gradient_ignored
[docs] def train(self, data, labels, quantum_instance=None, minibatch_size=-1): """Train the models, and save results. Args: data (numpy.ndarray): NxD array, N is number of data and D is dimension labels (numpy.ndarray): Nx1 array, N is number of data quantum_instance (QuantumInstance): quantum backend with all setting minibatch_size (int): the size of each minibatched accuracy evaluation """ self._quantum_instance = \ self._quantum_instance if quantum_instance is None else quantum_instance minibatch_size = minibatch_size if minibatch_size > 0 else self._minibatch_size self._batches, self._label_batches = self.batch_data(data, labels, minibatch_size) self._batch_index = 0 if self.initial_point is None: self.initial_point = self.random.randn(self._var_form.num_parameters) self._eval_count = 0 grad_fn = None if minibatch_size > 0 and self.is_gradient_really_supported(): # we need some wrapper grad_fn = self._gradient_function_wrapper result = self.find_minimum(initial_point=self.initial_point, var_form=self.var_form, cost_fn=self._cost_function_wrapper, optimizer=self.optimizer, gradient_fn=grad_fn) # TODO remove - mimics former VQAlgorithm result dict so it can be extended self._ret = {} self._ret['num_optimizer_evals'] = result.optimizer_evals self._ret['min_val'] = result.optimal_value self._ret['opt_params'] = result.optimal_point self._ret['eval_time'] = result.optimizer_time if self._ret['num_optimizer_evals'] is not None and \ self._eval_count >= self._ret['num_optimizer_evals']: self._eval_count = self._ret['num_optimizer_evals'] self._eval_time = self._ret['eval_time'] logger.info('Optimization complete in %s seconds.\nFound opt_params %s in %s evals', self._eval_time, self._ret['opt_params'], self._eval_count) self._ret['eval_count'] = self._eval_count del self._batches del self._label_batches del self._batch_index self._ret['training_loss'] = self._ret['min_val']
# temporary fix: this code should be unified with the gradient api in optimizer.py def _gradient_function_wrapper(self, theta): """Compute and return the gradient at the point theta. Args: theta (numpy.ndarray): 1-d array Returns: numpy.ndarray: 1-d array with the same shape as theta. The gradient computed """ epsilon = 1e-8 f_orig = self._cost_function_wrapper(theta) grad = np.zeros((len(theta),), float) for k, _ in enumerate(theta): theta[k] += epsilon f_new = self._cost_function_wrapper(theta) grad[k] = (f_new - f_orig) / epsilon theta[k] -= epsilon # recover to the center state if self.is_gradient_really_supported(): self._batch_index += 1 # increment the batch after gradient callback return grad def _cost_function_wrapper(self, theta): batch_index = self._batch_index % len(self._batches) predicted_probs, _ = self._get_prediction(self._batches[batch_index], theta) total_cost = [] if not isinstance(predicted_probs, list): predicted_probs = [predicted_probs] for i, _ in enumerate(predicted_probs): curr_cost = cost_estimate(predicted_probs[i], self._label_batches[batch_index]) total_cost.append(curr_cost) if self._callback is not None: self._callback( self._eval_count, theta[i * self._var_form.num_parameters:(i + 1) * self._var_form.num_parameters], curr_cost, self._batch_index ) self._eval_count += 1 if not self.is_gradient_really_supported(): self._batch_index += 1 # increment the batch after eval callback logger.debug('Intermediate batch cost: %s', sum(total_cost)) return total_cost if len(total_cost) > 1 else total_cost[0]
[docs] def test(self, data, labels, quantum_instance=None, minibatch_size=-1, params=None): """Predict the labels for the data, and test against with ground truth labels. Args: data (numpy.ndarray): NxD array, N is number of data and D is data dimension labels (numpy.ndarray): Nx1 array, N is number of data quantum_instance (QuantumInstance): quantum backend with all setting minibatch_size (int): the size of each minibatched accuracy evaluation params (list): list of parameters to populate in the variational form Returns: float: classification accuracy """ # minibatch size defaults to setting in instance variable if not set minibatch_size = minibatch_size if minibatch_size > 0 else self._minibatch_size batches, label_batches = self.batch_data(data, labels, minibatch_size) self.batch_num = 0 if params is None: params = self.optimal_params total_cost = 0 total_correct = 0 total_samples = 0 self._quantum_instance = \ self._quantum_instance if quantum_instance is None else quantum_instance for batch, label_batch in zip(batches, label_batches): predicted_probs, _ = self._get_prediction(batch, params) total_cost += cost_estimate(predicted_probs, label_batch) total_correct += np.sum((np.argmax(predicted_probs, axis=1) == label_batch)) total_samples += label_batch.shape[0] int_accuracy = \ np.sum((np.argmax(predicted_probs, axis=1) == label_batch)) / label_batch.shape[0] logger.debug('Intermediate batch accuracy: {:.2f}%'.format(int_accuracy * 100.0)) total_accuracy = total_correct / total_samples logger.info('Accuracy is {:.2f}%'.format(total_accuracy * 100.0)) self._ret['testing_accuracy'] = total_accuracy self._ret['test_success_ratio'] = total_accuracy self._ret['testing_loss'] = total_cost / len(batches) return total_accuracy
[docs] def predict(self, data, quantum_instance=None, minibatch_size=-1, params=None): """Predict the labels for the data. Args: data (numpy.ndarray): NxD array, N is number of data, D is data dimension quantum_instance (QuantumInstance): quantum backend with all setting minibatch_size (int): the size of each minibatched accuracy evaluation params (list): list of parameters to populate in the variational form Returns: list: for each data point, generates the predicted probability for each class list: for each data point, generates the predicted label (that with the highest prob) """ # minibatch size defaults to setting in instance variable if not set minibatch_size = minibatch_size if minibatch_size > 0 else self._minibatch_size batches, _ = self.batch_data(data, None, minibatch_size) if params is None: params = self.optimal_params predicted_probs = None predicted_labels = None self._quantum_instance = \ self._quantum_instance if quantum_instance is None else quantum_instance for i, batch in enumerate(batches): if len(batches) > 0: # pylint: disable=len-as-condition logger.debug('Predicting batch %s', i) batch_probs, batch_labels = self._get_prediction(batch, params) if not predicted_probs and not predicted_labels: predicted_probs = batch_probs predicted_labels = batch_labels else: predicted_probs = np.concatenate((predicted_probs, batch_probs)) predicted_labels = np.concatenate((predicted_labels, batch_labels)) self._ret['predicted_probs'] = predicted_probs self._ret['predicted_labels'] = predicted_labels return predicted_probs, predicted_labels
def _run(self): self.train(self._training_dataset[0], self._training_dataset[1]) if self._test_dataset is not None: self.test(self._test_dataset[0], self._test_dataset[1]) if self._datapoints is not None: _, predicted_labels = self.predict(self._datapoints) self._ret['predicted_classes'] = map_label_to_class_name(predicted_labels, self._label_to_class) self.cleanup_parameterized_circuits() return self._ret
[docs] def get_optimal_cost(self): """ get optimal cost """ if 'opt_params' not in self._ret: raise AquaError("Cannot return optimal cost before running the " "algorithm to find optimal params.") return self._ret['min_val']
[docs] def get_optimal_circuit(self): """ get optimal circuit """ if 'opt_params' not in self._ret: raise AquaError("Cannot find optimal circuit before running " "the algorithm to find optimal params.") if isinstance(self._var_form, QuantumCircuit): param_dict = dict(zip(self._var_form_params, self._ret['opt_params'])) return self._var_form.assign_parameters(param_dict) return self._var_form.construct_circuit(self._ret['opt_params'])
[docs] def get_optimal_vector(self): """ get optimal vector """ # pylint: disable=import-outside-toplevel from qiskit.aqua.utils.run_circuits import find_regs_by_name if 'opt_params' not in self._ret: raise AquaError("Cannot find optimal vector before running " "the algorithm to find optimal params.") qc = self.get_optimal_circuit() if self._quantum_instance.is_statevector: ret = self._quantum_instance.execute(qc) self._ret['min_vector'] = ret.get_statevector(qc, decimals=16) else: c = ClassicalRegister(qc.width(), name='c') q = find_regs_by_name(qc, 'q') qc.add_register(c) qc.barrier(q) qc.measure(q, c) ret = self._quantum_instance.execute(qc) self._ret['min_vector'] = ret.get_counts(qc) return self._ret['min_vector']
@property def feature_map(self) -> Optional[Union[FeatureMap, QuantumCircuit]]: """Return the feature map.""" return self._feature_map @feature_map.setter def feature_map(self, feature_map: Union[FeatureMap, QuantumCircuit]): """Set the feature map. Also sets the number of qubits, the internally stored feature map parameters and, if the feature map is a circuit, the order of the parameters. """ if isinstance(feature_map, QuantumCircuit): # patch the feature dimension to the circuit feature_map.feature_dimension = len(feature_map.parameters) # store the parameters self._num_qubits = feature_map.num_qubits self._feature_map_params = sorted(feature_map.parameters, key=lambda p: p.name) self._feature_map = feature_map elif isinstance(feature_map, FeatureMap): # raw feature vector is not yet replaced if not isinstance(feature_map, RawFeatureVector): warnings.warn('The qiskit.aqua.components.feature_maps.FeatureMap object is ' 'deprecated as of 0.7.0 and will be removed no earlier than 3 months ' 'after the release. You should pass a QuantumCircuit object instead. ' 'See also qiskit.circuit.library.data_preparation for a collection ' 'of suitable circuits.', DeprecationWarning, stacklevel=2) self._num_qubits = feature_map.num_qubits self._feature_map_params = ParameterVector('x', length=feature_map.feature_dimension) self._feature_map = feature_map else: raise ValueError('Unsupported type {} of feature_map.'.format(type(feature_map))) if self._feature_map.feature_dimension == 0: warnings.warn('The feature map has no parameters that can be optimized to represent ' 'the data. This will most likely cause the VQC to fail.') @property def optimal_params(self): """ returns optimal parameters """ if 'opt_params' not in self._ret: raise AquaError("Cannot find optimal params before running the algorithm.") return self._ret['opt_params'] @property def ret(self): """ returns result """ return self._ret @ret.setter def ret(self, new_value): """ sets result """ self._ret = new_value @property def label_to_class(self): """ returns label to class """ return self._label_to_class @property def class_to_label(self): """ returns class to label """ return self._class_to_label
[docs] def load_model(self, file_path): """ load model """ model_npz = np.load(file_path, allow_pickle=True) self._ret['opt_params'] = model_npz['opt_params']
[docs] def save_model(self, file_path): """ save model """ model = {'opt_params': self._ret['opt_params']} np.savez(file_path, **model)
@property def test_dataset(self): """ returns test dataset """ return self._test_dataset @property def training_dataset(self): """ returns training dataset """ return self._training_dataset @property def datapoints(self): """ return data points """ return self._datapoints
def assign_label(measured_key, num_classes): """ Classes = 2: - If odd number of qubits we use majority vote - If even number of qubits we use parity Classes = 3 - We use part-parity {ex. for 2 qubits: [00], [01,10], [11] would be the three labels} Args: measured_key (str): measured key num_classes (int): number of classes Returns: int: key order """ measured_key = np.asarray([int(k) for k in list(measured_key)]) num_qubits = len(measured_key) if num_classes == 2: if num_qubits % 2 != 0: total = np.sum(measured_key) return 1 if total > num_qubits / 2 else 0 else: hamming_weight = np.sum(measured_key) is_odd_parity = hamming_weight % 2 return is_odd_parity elif num_classes == 3: first_half = int(np.floor(num_qubits / 2)) modulo = num_qubits % 2 # First half of key hamming_weight_1 = np.sum(measured_key[0:first_half + modulo]) # Second half of key hamming_weight_2 = np.sum(measured_key[first_half + modulo:]) is_odd_parity_1 = hamming_weight_1 % 2 is_odd_parity_2 = hamming_weight_2 % 2 return is_odd_parity_1 + is_odd_parity_2 else: total_size = 2**num_qubits class_step = np.floor(total_size / num_classes) decimal_value = measured_key.dot(1 << np.arange(measured_key.shape[-1] - 1, -1, -1)) key_order = int(decimal_value / class_step) return key_order if key_order < num_classes else num_classes - 1 def cost_estimate(probs, gt_labels, shots=None): # pylint: disable=unused-argument """Calculate cross entropy. Args: shots (int): the number of shots used in quantum computing probs (numpy.ndarray): NxK array, N is the number of data and K is the number of class gt_labels (numpy.ndarray): Nx1 array Returns: float: cross entropy loss between estimated probs and gt_labels Note: shots is kept since it may be needed in future. """ mylabels = np.zeros(probs.shape) for i in range(gt_labels.shape[0]): whichindex = gt_labels[i] mylabels[i][whichindex] = 1 def cross_entropy(predictions, targets, epsilon=1e-12): predictions = np.clip(predictions, epsilon, 1. - epsilon) N = predictions.shape[0] tmp = np.sum(targets * np.log(predictions), axis=1) ce = -np.sum(tmp) / N return ce x = cross_entropy(probs, mylabels) return x def cost_estimate_sigmoid(shots, probs, gt_labels): """Calculate sigmoid cross entropy Args: shots (int): the number of shots used in quantum computing probs (numpy.ndarray): NxK array, N is the number of data and K is the number of class gt_labels (numpy.ndarray): Nx1 array Returns: float: sigmoid cross entropy loss between estimated probs and gt_labels """ # Error in the order of parameters corrected below - 19 Dec 2018 # x = cost_estimate(shots, probs, gt_labels) x = cost_estimate(probs, gt_labels, shots) loss = (1.) / (1. + np.exp(-x)) return loss def return_probabilities(counts, num_classes): """Return the probabilities of given measured counts Args: counts (list[dict]): N data and each with a dict recording the counts num_classes (int): number of classes Returns: numpy.ndarray: NxK array """ probs = np.zeros(((len(counts), num_classes))) for idx, _ in enumerate(counts): count = counts[idx] shots = sum(count.values()) for k, v in count.items(): label = assign_label(k, num_classes) probs[idx][label] += v / shots return probs