# -*- coding: utf-8 -*-
# This code is part of Qiskit.
#
# (C) Copyright IBM 2020.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""An implementation of the ADMM algorithm."""
import copy
import logging
import time
from typing import List, Optional, Any
import numpy as np
from ..exceptions import QiskitOptimizationError
from .cplex_optimizer import CplexOptimizer
from .optimization_algorithm import OptimizationAlgorithm, OptimizationResult
from ..problems.quadratic_program import QuadraticProgram
from ..problems.variable import Variable
from ..problems.constraint import Constraint
from ..problems.quadratic_objective import QuadraticObjective
UPDATE_RHO_BY_TEN_PERCENT = 0
UPDATE_RHO_BY_RESIDUALS = 1
logger = logging.getLogger(__name__)
class ADMMParameters:
"""Defines a set of parameters for ADMM optimizer."""
def __init__(self, rho_initial: float = 10000, factor_c: float = 100000, beta: float = 1000,
max_iter: int = 10, tol: float = 1.e-4, max_time: float = np.inf,
three_block: bool = True, vary_rho: int = UPDATE_RHO_BY_TEN_PERCENT,
tau_incr: float = 2, tau_decr: float = 2, mu_res: float = 10,
mu_merit: float = 1000) -> None:
"""Defines parameters for ADMM optimizer and their default values.
Args:
rho_initial: Initial value of rho parameter of ADMM.
factor_c: Penalizing factor for equality constraints, when mapping to QUBO.
beta: Penalization for y decision variables.
max_iter: Maximum number of iterations for ADMM.
tol: Tolerance for the residual convergence.
max_time: Maximum running time (in seconds) for ADMM.
three_block: Boolean flag to select the 3-block ADMM implementation.
vary_rho: Flag to select the rule to update rho.
If set to 0, then rho increases by 10% at each iteration.
If set to 1, then rho is modified according to primal and dual residuals.
tau_incr: Parameter used in the rho update (UPDATE_RHO_BY_RESIDUALS).
The update rule can be found in:
Boyd, S., Parikh, N., Chu, E., Peleato, B., & Eckstein, J. (2011).
Distributed optimization and statistical learning via the alternating
direction method of multipliers.
Foundations and TrendsĀ® in Machine learning, 3(1), 1-122.
tau_decr: Parameter used in the rho update (UPDATE_RHO_BY_RESIDUALS).
mu_res: Parameter used in the rho update (UPDATE_RHO_BY_RESIDUALS).
mu_merit: Penalization for constraint residual. Used to compute the merit values.
"""
super().__init__()
self.mu_merit = mu_merit
self.mu_res = mu_res
self.tau_decr = tau_decr
self.tau_incr = tau_incr
self.vary_rho = vary_rho
self.three_block = three_block
self.max_time = max_time
self.tol = tol
self.max_iter = max_iter
self.factor_c = factor_c
self.beta = beta
self.rho_initial = rho_initial
class ADMMState:
"""Internal computation state of the ADMM implementation.
The state keeps track of various variables are stored that are being updated during problem
solving. The values are relevant to the problem being solved. The state is recreated for each
optimization problem. State is returned as the third value.
"""
def __init__(self,
op: QuadraticProgram,
binary_indices: List[int],
continuous_indices: List[int],
rho_initial: float) -> None:
"""
Args:
op: The optimization problem being solved.
binary_indices: Indices of the binary decision variables of the original problem.
continuous_indices: Indices of the continuous decision variables of the original
problem.
rho_initial: Initial value of the rho parameter.
"""
super().__init__()
# Optimization problem itself
self.op = op
# Indices of the variables
self.binary_indices = binary_indices
self.continuous_indices = continuous_indices
# define heavily used matrix, they are used at each iteration, so let's cache them,
# they are np.ndarrays
# pylint:disable=invalid-name
# objective
self.q0 = None
self.c0 = None
self.q1 = None
self.c1 = None
# constraints
self.a0 = None
self.b0 = None
# These are the parameters that are updated in the ADMM iterations.
self.u = np.zeros(len(continuous_indices))
binary_size = len(binary_indices)
self.x0 = np.zeros(binary_size)
self.z = np.zeros(binary_size)
self.z_init = self.z
self.y = np.zeros(binary_size)
self.lambda_mult = np.zeros(binary_size)
# The following structures store quantities obtained in each ADMM iteration.
self.cost_iterates = []
self.residuals = []
self.dual_residuals = []
self.cons_r = []
self.merits = []
self.lambdas = []
self.x0_saved = []
self.u_saved = []
self.z_saved = []
self.y_saved = []
self.rho = rho_initial
self.binary_equality_constraints = [] # lin. eq. constraints with bin. vars. only
self.equality_constraints = [] # all equality constraints
self.inequality_constraints = [] # all inequality constraints
class ADMMOptimizationResult(OptimizationResult):
""" ADMMOptimization Result."""
def __init__(self, x: Optional[Any] = None, fval: Optional[Any] = None,
state: Optional[ADMMState] = None, results: Optional[Any] = None) -> None:
super().__init__(x, fval, results or state)
self._state = state
@property
def state(self) -> Optional[ADMMState]:
""" returns state """
return self._state
[docs]class ADMMOptimizer(OptimizationAlgorithm):
"""An implementation of the ADMM-based heuristic.
This algorithm is introduced in [1].
**References:**
[1] Gambella, C., & Simonetto, A. (2020). Multi-block ADMM Heuristics for Mixed-Binary
Optimization on Classical and Quantum Computers. arXiv preprint arXiv:2001.02069.
"""
def __init__(self, qubo_optimizer: Optional[OptimizationAlgorithm] = None,
continuous_optimizer: Optional[OptimizationAlgorithm] = None,
params: Optional[ADMMParameters] = None) -> None:
"""
Args:
qubo_optimizer: An instance of OptimizationAlgorithm that can effectively solve
QUBO problems.
continuous_optimizer: An instance of OptimizationAlgorithm that can solve
continuous problems.
params: An instance of ADMMParameters.
Raises:
NameError: CPLEX is not installed.
"""
super().__init__()
self._log = logging.getLogger(__name__)
# create default params if not present
self._params = params or ADMMParameters()
# create optimizers if not specified
self._qubo_optimizer = qubo_optimizer or CplexOptimizer()
self._continuous_optimizer = continuous_optimizer or CplexOptimizer()
# internal state where we'll keep intermediate solution
# here, we just declare the class variable, the variable is initialized in kept in
# the solve method.
self._state = None # Optional[ADMMState]
[docs] def get_compatibility_msg(self, problem: QuadraticProgram) -> Optional[str]:
"""Checks whether a given problem can be solved with the optimizer implementing this method.
Args:
problem: The optimization problem to check compatibility.
Returns:
Returns True if the problem is compatible, otherwise raises an error.
Raises:
QiskitOptimizationError: If the problem is not compatible with the ADMM optimizer.
"""
msg = ''
# 1. get bin/int and continuous variable indices
bin_int_indices = self._get_variable_indices(problem, Variable.Type.BINARY)
continuous_indices = self._get_variable_indices(problem, Variable.Type.CONTINUOUS)
# 2. binary and continuous variables are separable in objective
for bin_int_index in bin_int_indices:
for continuous_index in continuous_indices:
coeff = problem.objective.quadratic[bin_int_index, continuous_index]
if coeff != 0:
# binary and continuous vars are mixed.
msg += 'Binary and continuous variables are not separable in the objective. '
# if an error occurred, return error message, otherwise, return None
return msg
[docs] def solve(self, problem: QuadraticProgram) -> ADMMOptimizationResult:
"""Tries to solves the given problem using ADMM algorithm.
Args:
problem: The problem to be solved.
Returns:
The result of the optimizer applied to the problem.
Raises:
QiskitOptimizationError: If the problem is not compatible with the ADMM optimizer.
"""
# check compatibility and raise exception if incompatible
msg = self.get_compatibility_msg(problem)
if len(msg) > 0:
raise QiskitOptimizationError('Incompatible problem: {}'.format(msg))
# debug
self._log.debug("Initial problem: %s", problem.export_as_lp_string())
# map integer variables to binary variables
from ..converters.integer_to_binary import IntegerToBinary
int2bin = IntegerToBinary()
problem = int2bin.encode(problem)
# we deal with minimization in the optimizer, so turn the problem to minimization
problem, sense = self._turn_to_minimization(problem)
# parse problem and convert to an ADMM specific representation.
binary_indices = self._get_variable_indices(problem, Variable.Type.BINARY)
continuous_indices = self._get_variable_indices(problem, Variable.Type.CONTINUOUS)
# create our computation state.
self._state = ADMMState(problem, binary_indices,
continuous_indices, self._params.rho_initial)
# convert optimization problem to a set of matrices and vector that are used
# at each iteration.
self._convert_problem_representation()
start_time = time.time()
# we have not stated our computations yet, so elapsed time initialized as zero.
elapsed_time = 0
iteration = 0
residual = 1.e+2
while (iteration < self._params.max_iter and residual > self._params.tol) \
and (elapsed_time < self._params.max_time):
if binary_indices:
op1 = self._create_step1_problem()
self._state.x0 = self._update_x0(op1)
# debug
self._log.debug("Step 1 sub-problem: %s", op1.export_as_lp_string())
# else, no binary variables exist, and no update to be done in this case.
# debug
self._log.debug("x0=%s", self._state.x0)
op2 = self._create_step2_problem()
self._state.u, self._state.z = self._update_x1(op2)
# debug
self._log.debug("Step 2 sub-problem: %s", op2.export_as_lp_string())
self._log.debug("u=%s", self._state.u)
self._log.debug("z=%s", self._state.z)
if self._params.three_block:
if binary_indices:
op3 = self._create_step3_problem()
self._state.y = self._update_y(op3)
# debug
self._log.debug("Step 3 sub-problem: %s", op3.export_as_lp_string())
# debug
self._log.debug("y=%s", self._state.y)
self._state.lambda_mult = self._update_lambda_mult()
# debug
self._log.debug("lambda: %s", self._state.lambda_mult)
cost_iterate = self._get_objective_value()
constraint_residual = self._get_constraint_residual()
residual, dual_residual = self._get_solution_residuals(iteration)
merit = self._get_merit(cost_iterate, constraint_residual)
# debug
self._log.debug("cost_iterate=%s, cr=%s, merit=%s",
cost_iterate, constraint_residual, merit)
# costs are saved with their original sign.
self._state.cost_iterates.append(cost_iterate)
self._state.residuals.append(residual)
self._state.dual_residuals.append(dual_residual)
self._state.cons_r.append(constraint_residual)
self._state.merits.append(merit)
self._state.lambdas.append(np.linalg.norm(self._state.lambda_mult))
self._state.x0_saved.append(self._state.x0)
self._state.u_saved.append(self._state.u)
self._state.z_saved.append(self._state.z)
self._state.z_saved.append(self._state.y)
self._update_rho(residual, dual_residual)
iteration += 1
elapsed_time = time.time() - start_time
binary_vars, continuous_vars, objective_value = self._get_best_merit_solution()
solution = self._revert_solution_indexes(binary_vars, continuous_vars)
# flip the objective sign again if required
objective_value = objective_value * sense
# third parameter is our internal state of computations.
result = ADMMOptimizationResult(solution, objective_value, self._state)
# convert back integer to binary
result = int2bin.decode(result)
# debug
self._log.debug("solution=%s, objective=%s at iteration=%s",
solution, objective_value, iteration)
return result
@staticmethod
def _turn_to_minimization(problem: QuadraticProgram) -> (QuadraticProgram, float):
"""
Turns the problem to `ObjSense.MINIMIZE` by flipping the sign of the objective function
if initially it is `ObjSense.MAXIMIZE`. Otherwise returns the original problem.
Args:
problem: a problem to turn to minimization.
Returns:
A copy of the problem if sign flip is required, otherwise the original problem and
the original sense of the problem in the numerical representation.
"""
sense = problem.objective.sense.value
if problem.objective.sense == QuadraticObjective.Sense.MAXIMIZE:
problem = copy.deepcopy(problem)
problem.objective.sense = QuadraticObjective.Sense.MINIMIZE
problem.objective.constant = (-1) * problem.objective.constant
problem.objective.linear = (-1) * problem.objective.linear.coefficients
problem.objective.quadratic = (-1) * problem.objective.quadratic.coefficients
return problem, sense
@staticmethod
def _get_variable_indices(op: QuadraticProgram, var_type: Variable.Type) -> List[int]:
"""Returns a list of indices of the variables of the specified type.
Args:
op: Optimization problem.
var_type: type of variables to look for.
Returns:
List of indices.
"""
indices = []
for i, variable in enumerate(op.variables):
if variable.vartype == var_type:
indices.append(i)
return indices
def _get_current_solution(self) -> np.ndarray:
"""
Returns current solution of the problem.
Returns:
An array of the current solution.
"""
return self._revert_solution_indexes(self._state.x0, self._state.u)
def _revert_solution_indexes(self, binary_vars: np.ndarray, continuous_vars: np.ndarray) \
-> np.ndarray:
"""Constructs a solution array where variables are stored in the correct order.
Args:
binary_vars: solution for binary variables
continuous_vars: solution for continuous variables
Returns:
A solution array.
"""
solution = np.zeros(len(self._state.binary_indices) + len(self._state.continuous_indices))
# restore solution at the original index location
solution.put(self._state.binary_indices, binary_vars)
solution.put(self._state.continuous_indices, continuous_vars)
return solution
def _convert_problem_representation(self) -> None:
"""Converts problem representation into set of matrices and vectors."""
binary_var_indices = set(self._state.binary_indices)
# separate constraints
for constraint in self._state.op.linear_constraints:
if constraint.sense == Constraint.Sense.EQ:
self._state.equality_constraints.append(constraint)
# verify that there are only binary variables in the constraint
# this is to build A0, b0 in step 1
constraint_var_indices = set(constraint.linear.to_dict().keys())
if constraint_var_indices.issubset(binary_var_indices):
self._state.binary_equality_constraints.append(constraint)
elif constraint.sense in (Constraint.Sense.LE, Constraint.Sense.GE):
self._state.inequality_constraints.append(constraint)
# separate quadratic constraints into eq and non-eq
for constraint in self._state.op.quadratic_constraints:
if constraint.sense == Constraint.Sense.EQ:
self._state.equality_constraints.append(constraint)
elif constraint.sense in (Constraint.Sense.LE, Constraint.Sense.GE):
self._state.inequality_constraints.append(constraint)
# objective
self._state.q0 = self._get_q(self._state.binary_indices)
self._state.c0 = self._state.op.objective.linear.to_array()[self._state.binary_indices]
self._state.q1 = self._get_q(self._state.continuous_indices)
self._state.c1 = self._state.op.objective.linear.to_array()[self._state.continuous_indices]
# equality constraints with binary vars only
self._state.a0, self._state.b0 = self._get_a0_b0()
def _get_q(self, variable_indices: List[int]) -> np.ndarray:
"""Constructs a quadratic matrix for the variables with the specified indices
from the quadratic terms in the objective.
Args:
variable_indices: variable indices to look for.
Returns:
A matrix as a numpy array of the shape(len(variable_indices), len(variable_indices)).
"""
size = len(variable_indices)
q = np.zeros(shape=(size, size))
# fill in the matrix
# in fact we use re-indexed variables
for i, var_index_i in enumerate(variable_indices):
for j, var_index_j in enumerate(variable_indices):
# coefficients_as_array
q[i, j] = self._state.op.objective.quadratic[var_index_i, var_index_j]
return q
def _get_a0_b0(self) -> (np.ndarray, np.ndarray):
"""Constructs a matrix and a vector from the constraints in a form of Ax = b, where
x is a vector of binary variables.
Returns:
Corresponding matrix and vector as numpy arrays.
Raises:
ValueError: if the problem is not suitable for this optimizer.
"""
matrix = []
vector = []
for constraint in self._state.binary_equality_constraints:
row = constraint.linear.to_array().take(self._state.binary_indices).tolist()
matrix.append(row)
vector.append(constraint.rhs)
if len(matrix) != 0:
np_matrix = np.array(matrix)
np_vector = np.array(vector)
else:
np_matrix = np.array([0] * len(self._state.binary_indices)).reshape((1, -1))
np_vector = np.zeros(shape=(1,))
return np_matrix, np_vector
def _create_step1_problem(self) -> QuadraticProgram:
"""Creates a step 1 sub-problem.
Returns:
A newly created optimization problem.
"""
op1 = QuadraticProgram()
binary_size = len(self._state.binary_indices)
# create the same binary variables.
for i in range(binary_size):
op1.binary_var(name="x0_" + str(i + 1))
# prepare and set quadratic objective.
quadratic_objective = self._state.q0 +\
self._params.factor_c / 2 * np.dot(self._state.a0.transpose(), self._state.a0) +\
self._state.rho / 2 * np.eye(binary_size)
op1.objective.quadratic = quadratic_objective
# prepare and set linear objective.
linear_objective = self._state.c0 - \
self._params.factor_c * np.dot(self._state.b0, self._state.a0) + \
self._state.rho * (- self._state.y - self._state.z) + \
self._state.lambda_mult
op1.objective.linear = linear_objective
return op1
def _create_step2_problem(self) -> QuadraticProgram:
"""Creates a step 2 sub-problem.
Returns:
A newly created optimization problem.
"""
op2 = copy.deepcopy(self._state.op)
# replace binary variables with the continuous ones bound in [0,1]
# x0(bin) -> z(cts)
# u (cts) are still there unchanged
for i, var_index in enumerate(self._state.binary_indices):
variable = op2.variables[var_index]
variable.vartype = Variable.Type.CONTINUOUS
variable.upperbound = 1.
variable.lowerbound = 0.
# replacing Q0 objective and take of min/max sense, initially we consider minimization
op2.objective.quadratic[var_index, var_index] = self._state.rho / 2
# replacing linear objective
op2.objective.linear[var_index] = -1 * self._state.lambda_mult[i] - self._state.rho * \
(self._state.x0[i] - self._state.y[i])
# remove A0 x0 = b0 constraints
for constraint in self._state.binary_equality_constraints:
op2.remove_linear_constraint(constraint.name)
return op2
def _create_step3_problem(self) -> QuadraticProgram:
"""Creates a step 3 sub-problem.
Returns:
A newly created optimization problem.
"""
op3 = QuadraticProgram()
# add y variables.
binary_size = len(self._state.binary_indices)
for i in range(binary_size):
op3.continuous_var(name="y_" + str(i + 1), lowerbound=-np.inf, upperbound=np.inf)
# set quadratic objective y
quadratic_y = self._params.beta / 2 * np.eye(binary_size) + \
self._state.rho / 2 * np.eye(binary_size)
op3.objective.quadratic = quadratic_y
# set linear objective for y
linear_y = - self._state.lambda_mult - self._state.rho * (self._state.x0 - self._state.z)
op3.objective.linear = linear_y
return op3
def _update_x0(self, op1: QuadraticProgram) -> np.ndarray:
"""Solves the Step1 QuadraticProgram via the qubo optimizer.
Args:
op1: the Step1 QuadraticProgram.
Returns:
A solution of the Step1, as a numpy array.
"""
return np.asarray(self._qubo_optimizer.solve(op1).x)
def _update_x1(self, op2: QuadraticProgram) -> (np.ndarray, np.ndarray):
"""Solves the Step2 QuadraticProgram via the continuous optimizer.
Args:
op2: the Step2 QuadraticProgram
Returns:
A solution of the Step2, as a pair of numpy arrays.
First array contains the values of decision variables u, and
second array contains the values of decision variables z.
"""
vars_op2 = np.asarray(self._continuous_optimizer.solve(op2).x)
vars_u = vars_op2.take(self._state.continuous_indices)
vars_z = vars_op2.take(self._state.binary_indices)
return vars_u, vars_z
def _update_y(self, op3: QuadraticProgram) -> np.ndarray:
"""Solves the Step3 QuadraticProgram via the continuous optimizer.
Args:
op3: the Step3 QuadraticProgram
Returns:
A solution of the Step3, as a numpy array.
"""
return np.asarray(self._continuous_optimizer.solve(op3).x)
def _get_best_merit_solution(self) -> (np.ndarray, np.ndarray, float):
"""The ADMM solution is that for which the merit value is the min
* sol: Iterate with the min merit value
* sol_val: Value of sol, according to the original objective
Returns:
A tuple of (binary_vars, continuous_vars, sol_val), where
* binary_vars: binary variable values with the min merit value
* continuous_vars: continuous variable values with the min merit value
* sol_val: Value of the objective function
"""
it_min_merits = self._state.merits.index(min(self._state.merits))
binary_vars = self._state.x0_saved[it_min_merits]
continuous_vars = self._state.u_saved[it_min_merits]
sol_val = self._state.cost_iterates[it_min_merits]
return binary_vars, continuous_vars, sol_val
def _update_lambda_mult(self) -> np.ndarray:
"""
Updates the values of lambda multiplier, given the updated iterates
x0, z, and y.
Returns: The updated array of values of lambda multiplier.
"""
return self._state.lambda_mult + \
self._state.rho * (self._state.x0 - self._state.z - self._state.y)
def _update_rho(self, primal_residual: float, dual_residual: float) -> None:
"""Updating the rho parameter in ADMM.
Args:
primal_residual: primal residual
dual_residual: dual residual
"""
if self._params.vary_rho == UPDATE_RHO_BY_TEN_PERCENT:
# Increase rho, to aid convergence.
if self._state.rho < 1.e+10:
self._state.rho *= 1.1
elif self._params.vary_rho == UPDATE_RHO_BY_RESIDUALS:
if primal_residual > self._params.mu_res * dual_residual:
self._state.rho = self._params.tau_incr * self._state.rho
elif dual_residual > self._params.mu_res * primal_residual:
self._state.rho = self._params.tau_decr * self._state.rho
def _get_constraint_residual(self) -> float:
"""Compute violation of the constraints of the original problem, as:
* norm 1 of the body-rhs of eq. constraints
* -1 * min(body - rhs, 0) for geq constraints
* max(body - rhs, 0) for leq constraints
Returns:
Violation of the constraints as a float value
"""
solution = self._get_current_solution()
# equality constraints
cr_eq = 0
for constraint in self._state.equality_constraints:
cr_eq += np.abs(constraint.evaluate(solution) - constraint.rhs)
# inequality constraints
cr_ineq = 0
for constraint in self._state.inequality_constraints:
sense = -1 if constraint.sense == Constraint.Sense.GE else 1
cr_ineq += max(sense * (constraint.evaluate(solution) - constraint.rhs), 0)
return cr_eq + cr_ineq
def _get_merit(self, cost_iterate: float, constraint_residual: float) -> float:
"""Compute merit value associated with the current iterate
Args:
cost_iterate: Cost at the certain iteration.
constraint_residual: Value of violation of the constraints.
Returns:
Merit value as a float
"""
return cost_iterate + self._params.mu_merit * constraint_residual
def _get_objective_value(self) -> float:
"""Computes the value of the objective function.
Returns:
Value of the objective function as a float
"""
return self._state.op.objective.evaluate(self._get_current_solution())
def _get_solution_residuals(self, iteration: int) -> (float, float):
"""Compute primal and dual residual.
Args:
iteration: Iteration number.
Returns:
r, s as primary and dual residuals.
"""
elements = self._state.x0 - self._state.z - self._state.y
primal_residual = np.linalg.norm(elements)
if iteration > 0:
elements_dual = self._state.z - self._state.z_saved[iteration - 1]
else:
elements_dual = self._state.z - self._state.z_init
dual_residual = self._state.rho * np.linalg.norm(elements_dual)
return primal_residual, dual_residual