import copy
import numpy as np
from typing import List
from detpy.DETAlgs.archive_reduction.archive_reduction import ArchiveReduction
from detpy.DETAlgs.base import BaseAlg
from detpy.DETAlgs.data.alg_data import ALSHADEData
from detpy.DETAlgs.mutation_methods.current_to_pbest_1 import MutationCurrentToPBest1
from detpy.DETAlgs.mutation_methods.current_to_xamean import MutationCurrentToXamean
from detpy.DETAlgs.random.index_generator import IndexGenerator
from detpy.DETAlgs.random.random_value_generator import RandomValueGenerator
from detpy.math_functions.lehmer_mean import LehmerMean
from detpy.models.enums.boundary_constrain import fix_boundary_constraints_with_parent
from detpy.models.enums.optimization import OptimizationType
from detpy.models.population import Population
from detpy.DETAlgs.crossover_methods.binomial_crossover import BinomialCrossover
[docs]
class ALSHADE(BaseAlg):
"""
ALSHADE: A novel adaptive L-SHADE algorithm and its application in UAV swarm resource configuration problem
Links:
https://www.sciencedirect.com/science/article/abs/pii/S0020025522004893
References:
Li, Y., Han, T., Zhou, H., Tang, S., & Zhao, H. (2022). A novel adaptive L-SHADE algorithm and
its application in UAV swarm resource configuration problem. Information Sciences, 606, 350–367.
https://doi.org/10.1016/j.ins.2022.05.058
"""
def __init__(self, params: ALSHADEData, db_conn=None, db_auto_write=False):
super().__init__(ALSHADE.__name__, params, db_conn, db_auto_write)
self._lehmer_mean = LehmerMean()
self._H = params.memory_size
self._memory_F = np.full(self._H, 0.5)
self._memory_Cr = np.full(self._H, 0.5)
self._memory_Cr[0] = 0.9
self._memory_F[0] = 0.9
self._index_update_f_and_cr = 1
self._successCr = []
self._successF = []
self._difference_fitness_success = []
self._e = params.elite_factor
self._P = params.init_probability_mutation_strategy
self._archive_size = params.archive_size
self._archive = [copy.deepcopy(self._pop.get_best_members(1)[0])]
if len(self._archive) > self._archive_size:
self._archive = self._archive[:self._archive_size]
self._min_pop_size = params.minimum_population_size
self._start_population_size = self.population_size
self._mutation_memory = []
self._population_size_reduction_strategy = params.population_size_reduction_strategy
self.nfe_max = self.nfe_max
# Define a terminal value for memory_Cr. It indicates that no successful Cr was found in the epoch.
self._TERMINAL = np.nan
# We need this value for checking close to zero in update_memory
self._EPSILON = 0.00001
self._index_gen = IndexGenerator()
self._random_value_gen = RandomValueGenerator()
self._binomial_crossing = BinomialCrossover()
self._archive_reduction = ArchiveReduction()
def _update_population_size(self, nfe: int, total_nfe: int, start_pop_size: int, min_pop_size: int):
"""
Update the population size based on the current nfe using the specified population size reduction strategy.
Parameters:
- nfe (int): The current nfe number.
- total_nfe (int): The total number of function evaluations.
- start_pop_size (int): The initial population size.
- min_pop_size (int): The minimum population size.
"""
new_size = self._population_size_reduction_strategy.get_new_population_size(
nfe, total_nfe, start_pop_size, min_pop_size
)
self._pop.resize(new_size)
def _compute_weighted_archive_mean(self):
"""
Compute the weighted mean from the archive.
"""
m = round(self._e * len(self._archive))
if m == 0:
return np.mean([[chrom.real_value for chrom in ind.chromosomes] for ind in self._archive], axis=0)
reverse_sort = (self._pop.optimization == OptimizationType.MAXIMIZATION)
sorted_archive = sorted(self._archive, key=lambda ind: ind.fitness_value, reverse=reverse_sort)
top_m = sorted_archive[:m]
weights = np.log(m + 0.5) - np.log(np.arange(1, m + 1))
weights /= np.sum(weights)
return np.sum([
[w * chrom.real_value for chrom in ind.chromosomes]
for w, ind in zip(weights, top_m)
], axis=0)
def _mutate(self, population: Population, the_best_to_select_table: List[int], f_table: List[float]) -> Population:
"""
The mutation method of the ALSHADE algorithm.
Parameters:
- population (Population): The current population.
- the_best_to_select_table (List[int]): A list where each element specifies the number of best members to select for the mutation process for the corresponding individual in the population.
- f_table (List[float]): A list of scaling factors (F) for each individual in the population.
"""
new_members = []
# Contains 1 if "current-to-pbest/1" was used, 3 if "current to xamean" was used
memory = []
pa = np.concatenate((population.members, self._archive))
for i in range(population.size):
r1 = self._index_gen.generate_unique(len(population.members), [i])
# Archive is included population and archive members
r2 = self._index_gen.generate_unique(len(pa), [i, r1])
p = np.random.rand()
if p < self._P:
best_members = population.get_best_members(the_best_to_select_table[i])
best = best_members[np.random.randint(0, len(best_members))]
mutant = MutationCurrentToPBest1.mutate(
base_member=population.members[i],
best_member=best,
r1=population.members[r1],
r2=pa[r2],
f=f_table[i]
)
memory.append(1)
else:
xamean = self._compute_weighted_archive_mean()
mutant = MutationCurrentToXamean.mutate(
base_member=population.members[i],
xamean=xamean,
r1=population.members[r1],
r2=pa[r2],
f=f_table[i]
)
# current to xamean
memory.append(3)
new_members.append(mutant)
return Population.with_new_members(population, new_members)
def _selection(self, origin: Population, modified: Population, ftable: List[float], cr_table: List[float]):
"""
The selection method of the ALSHADE algorithm.
Parameters:
- population (Population): The current population.
- modified (Population): The population after mutation and crossover.
- f_table (List[float]): A list of scaling factors (F) for each individual in the population.
- cr_table (List[float]): A list of crossover rates (CR) for each individual in the population.
"""
new_members = []
memory_flags = self._mutation_memory
for i in range(origin.size):
if (origin.optimization == OptimizationType.MINIMIZATION and origin.members[i] <= modified.members[i]) or \
(origin.optimization == OptimizationType.MAXIMIZATION and origin.members[i] >= modified.members[i]):
new_members.append(copy.deepcopy(origin.members[i]))
else:
self._archive.append(copy.deepcopy(modified.members[i]))
self._successF.append(ftable[i])
self._successCr.append(cr_table[i])
df = abs(modified.members[i].fitness_value - origin.members[i].fitness_value)
self._difference_fitness_success.append(df)
new_members.append(copy.deepcopy(modified.members[i]))
a1_all = sum(1 for m in memory_flags if m == 1)
a2_all = sum(1 for m in memory_flags if m == 3)
if origin.optimization == OptimizationType.MINIMIZATION:
a1_better = sum(
1 for m, i in zip(memory_flags, range(origin.size))
if m == 1 and modified.members[i] < origin.members[i]
)
a2_better = sum(
1 for m, i in zip(memory_flags, range(origin.size))
if m == 3 and modified.members[i] < origin.members[i]
)
else: # Maximization
a1_better = sum(
1 for m, i in zip(memory_flags, range(origin.size))
if m == 1 and modified.members[i] > origin.members[i]
)
a2_better = sum(
1 for m, i in zip(memory_flags, range(origin.size))
if m == 3 and modified.members[i] > origin.members[i]
)
if a1_all > 0 and a2_all > 0:
p_a1 = a1_better / a1_all
p_a2 = a2_better / a2_all
self._P += (0.05 * (1 - self._P) * (p_a1 - p_a2) * self.nfe) / self.nfe_max
self._P = min(0.9, max(0.1, self._P))
return Population.with_new_members(origin, new_members)
def _update_memory(self, success_f: List[float], success_cr: List[float], df: List[float]):
"""
The method to update the memory of scaling factors (F) and crossover rates (CR) in the ALSHADE algorithm.
Parameters:
- success_f (List[float]): A list of successful scaling factors F that resulted in better fitness function values from the current epoch.
- success_cr (List[float]): A list of successful scaling factors CR that resulted in better fitness function values from the current epoch.
- df (List[float]): A list of differences between old and new value fitness function
"""
if len(success_f) > 0 and len(success_cr) > 0:
total = np.sum(df)
weights = np.array(df) / total
if np.isclose(total, 0.0, atol=self._EPSILON):
cr_new = self._TERMINAL
else:
cr_new = self._lehmer_mean.evaluate(success_cr, weights.tolist(), p=2)
f_new = self._lehmer_mean.evaluate(success_f, weights.tolist(), p=2)
f_new = np.clip(f_new, 0, 1)
self._memory_F[self._index_update_f_and_cr] = f_new
self._memory_Cr[self._index_update_f_and_cr] = cr_new
self._index_update_f_and_cr += 1
if self._index_update_f_and_cr >= len(self._memory_F):
self._index_update_f_and_cr = 1
self._successF = []
self._successCr = []
self._difference_fitness_success = []
def _initialize_parameters_for_epoch(self):
"""
Method to create f_table, cr_table and bests_to_select for the epoch.
"""
f_table = []
cr_table = []
bests_to_select = []
for _ in range(self._pop.size):
ri = self._index_gen.generate(0, self._H)
# We check here if we have TERMINAL value in memory_Cr
if np.isnan(self._memory_Cr[ri]):
cr = 0.0
else:
cr = self._random_value_gen.generate_normal(self._memory_Cr[ri], 0.1, 0.0, 1.0)
f = self._random_value_gen.generate_cauchy_greater_than_zero(self._memory_F[ri], 0.1, 0.0, 1.0)
f_table.append(f)
cr_table.append(cr)
min_percentage = 2 / self.population_size
max_percentage = 0.2
the_best_to_possible_select = self._random_value_gen.generate_count_from_percentage(self.population_size,
min_percentage,
max_percentage)
bests_to_select.append(the_best_to_possible_select)
return f_table, cr_table, bests_to_select
[docs]
def next_epoch(self):
f_table, cr_table, bests_to_select = self._initialize_parameters_for_epoch()
mutant = self._mutate(self._pop, bests_to_select, f_table)
trial = self._binomial_crossing.crossover_population(self._pop, mutant, cr_table)
fix_boundary_constraints_with_parent(self._pop, trial, self.boundary_constraints_fun)
trial.update_fitness_values(self._function.eval, self.parallel_processing)
self._pop = self._selection(self._pop, trial, f_table, cr_table)
self._archive = self._archive_reduction.reduce_archive(self._archive, self._archive_size, self.population_size)
self._update_memory(self._successF, self._successCr, self._difference_fitness_success)
self._update_population_size(self.nfe, self.nfe_max, self._start_population_size,
self._min_pop_size)