Source code for DETAlgs.al_shade

import copy
import numpy as np
from typing import List

from detpy.DETAlgs.archive_reduction.archive_reduction import ArchiveReduction
from detpy.DETAlgs.base import BaseAlg
from detpy.DETAlgs.data.alg_data import ALSHADEData
from detpy.DETAlgs.mutation_methods.current_to_pbest_1 import MutationCurrentToPBest1
from detpy.DETAlgs.mutation_methods.current_to_xamean import MutationCurrentToXamean
from detpy.DETAlgs.random.index_generator import IndexGenerator
from detpy.DETAlgs.random.random_value_generator import RandomValueGenerator
from detpy.math_functions.lehmer_mean import LehmerMean
from detpy.models.enums.boundary_constrain import fix_boundary_constraints_with_parent
from detpy.models.enums.optimization import OptimizationType
from detpy.models.population import Population
from detpy.DETAlgs.crossover_methods.binomial_crossover import BinomialCrossover


[docs] class ALSHADE(BaseAlg): """ ALSHADE: A novel adaptive L-SHADE algorithm and its application in UAV swarm resource configuration problem Links: https://www.sciencedirect.com/science/article/abs/pii/S0020025522004893 References: Li, Y., Han, T., Zhou, H., Tang, S., & Zhao, H. (2022). A novel adaptive L-SHADE algorithm and its application in UAV swarm resource configuration problem. Information Sciences, 606, 350–367. https://doi.org/10.1016/j.ins.2022.05.058 """ def __init__(self, params: ALSHADEData, db_conn=None, db_auto_write=False): super().__init__(ALSHADE.__name__, params, db_conn, db_auto_write) self._lehmer_mean = LehmerMean() self._H = params.memory_size self._memory_F = np.full(self._H, 0.5) self._memory_Cr = np.full(self._H, 0.5) self._memory_Cr[0] = 0.9 self._memory_F[0] = 0.9 self._index_update_f_and_cr = 1 self._successCr = [] self._successF = [] self._difference_fitness_success = [] self._e = params.elite_factor self._P = params.init_probability_mutation_strategy self._archive_size = params.archive_size self._archive = [copy.deepcopy(self._pop.get_best_members(1)[0])] if len(self._archive) > self._archive_size: self._archive = self._archive[:self._archive_size] self._min_pop_size = params.minimum_population_size self._start_population_size = self.population_size self._mutation_memory = [] self._population_size_reduction_strategy = params.population_size_reduction_strategy self.nfe_max = self.nfe_max # Define a terminal value for memory_Cr. It indicates that no successful Cr was found in the epoch. self._TERMINAL = np.nan # We need this value for checking close to zero in update_memory self._EPSILON = 0.00001 self._index_gen = IndexGenerator() self._random_value_gen = RandomValueGenerator() self._binomial_crossing = BinomialCrossover() self._archive_reduction = ArchiveReduction() def _update_population_size(self, nfe: int, total_nfe: int, start_pop_size: int, min_pop_size: int): """ Update the population size based on the current nfe using the specified population size reduction strategy. Parameters: - nfe (int): The current nfe number. - total_nfe (int): The total number of function evaluations. - start_pop_size (int): The initial population size. - min_pop_size (int): The minimum population size. """ new_size = self._population_size_reduction_strategy.get_new_population_size( nfe, total_nfe, start_pop_size, min_pop_size ) self._pop.resize(new_size) def _compute_weighted_archive_mean(self): """ Compute the weighted mean from the archive. """ m = round(self._e * len(self._archive)) if m == 0: return np.mean([[chrom.real_value for chrom in ind.chromosomes] for ind in self._archive], axis=0) reverse_sort = (self._pop.optimization == OptimizationType.MAXIMIZATION) sorted_archive = sorted(self._archive, key=lambda ind: ind.fitness_value, reverse=reverse_sort) top_m = sorted_archive[:m] weights = np.log(m + 0.5) - np.log(np.arange(1, m + 1)) weights /= np.sum(weights) return np.sum([ [w * chrom.real_value for chrom in ind.chromosomes] for w, ind in zip(weights, top_m) ], axis=0) def _mutate(self, population: Population, the_best_to_select_table: List[int], f_table: List[float]) -> Population: """ The mutation method of the ALSHADE algorithm. Parameters: - population (Population): The current population. - the_best_to_select_table (List[int]): A list where each element specifies the number of best members to select for the mutation process for the corresponding individual in the population. - f_table (List[float]): A list of scaling factors (F) for each individual in the population. """ new_members = [] # Contains 1 if "current-to-pbest/1" was used, 3 if "current to xamean" was used memory = [] pa = np.concatenate((population.members, self._archive)) for i in range(population.size): r1 = self._index_gen.generate_unique(len(population.members), [i]) # Archive is included population and archive members r2 = self._index_gen.generate_unique(len(pa), [i, r1]) p = np.random.rand() if p < self._P: best_members = population.get_best_members(the_best_to_select_table[i]) best = best_members[np.random.randint(0, len(best_members))] mutant = MutationCurrentToPBest1.mutate( base_member=population.members[i], best_member=best, r1=population.members[r1], r2=pa[r2], f=f_table[i] ) memory.append(1) else: xamean = self._compute_weighted_archive_mean() mutant = MutationCurrentToXamean.mutate( base_member=population.members[i], xamean=xamean, r1=population.members[r1], r2=pa[r2], f=f_table[i] ) # current to xamean memory.append(3) new_members.append(mutant) return Population.with_new_members(population, new_members) def _selection(self, origin: Population, modified: Population, ftable: List[float], cr_table: List[float]): """ The selection method of the ALSHADE algorithm. Parameters: - population (Population): The current population. - modified (Population): The population after mutation and crossover. - f_table (List[float]): A list of scaling factors (F) for each individual in the population. - cr_table (List[float]): A list of crossover rates (CR) for each individual in the population. """ new_members = [] memory_flags = self._mutation_memory for i in range(origin.size): if (origin.optimization == OptimizationType.MINIMIZATION and origin.members[i] <= modified.members[i]) or \ (origin.optimization == OptimizationType.MAXIMIZATION and origin.members[i] >= modified.members[i]): new_members.append(copy.deepcopy(origin.members[i])) else: self._archive.append(copy.deepcopy(modified.members[i])) self._successF.append(ftable[i]) self._successCr.append(cr_table[i]) df = abs(modified.members[i].fitness_value - origin.members[i].fitness_value) self._difference_fitness_success.append(df) new_members.append(copy.deepcopy(modified.members[i])) a1_all = sum(1 for m in memory_flags if m == 1) a2_all = sum(1 for m in memory_flags if m == 3) if origin.optimization == OptimizationType.MINIMIZATION: a1_better = sum( 1 for m, i in zip(memory_flags, range(origin.size)) if m == 1 and modified.members[i] < origin.members[i] ) a2_better = sum( 1 for m, i in zip(memory_flags, range(origin.size)) if m == 3 and modified.members[i] < origin.members[i] ) else: # Maximization a1_better = sum( 1 for m, i in zip(memory_flags, range(origin.size)) if m == 1 and modified.members[i] > origin.members[i] ) a2_better = sum( 1 for m, i in zip(memory_flags, range(origin.size)) if m == 3 and modified.members[i] > origin.members[i] ) if a1_all > 0 and a2_all > 0: p_a1 = a1_better / a1_all p_a2 = a2_better / a2_all self._P += (0.05 * (1 - self._P) * (p_a1 - p_a2) * self.nfe) / self.nfe_max self._P = min(0.9, max(0.1, self._P)) return Population.with_new_members(origin, new_members) def _update_memory(self, success_f: List[float], success_cr: List[float], df: List[float]): """ The method to update the memory of scaling factors (F) and crossover rates (CR) in the ALSHADE algorithm. Parameters: - success_f (List[float]): A list of successful scaling factors F that resulted in better fitness function values from the current epoch. - success_cr (List[float]): A list of successful scaling factors CR that resulted in better fitness function values from the current epoch. - df (List[float]): A list of differences between old and new value fitness function """ if len(success_f) > 0 and len(success_cr) > 0: total = np.sum(df) weights = np.array(df) / total if np.isclose(total, 0.0, atol=self._EPSILON): cr_new = self._TERMINAL else: cr_new = self._lehmer_mean.evaluate(success_cr, weights.tolist(), p=2) f_new = self._lehmer_mean.evaluate(success_f, weights.tolist(), p=2) f_new = np.clip(f_new, 0, 1) self._memory_F[self._index_update_f_and_cr] = f_new self._memory_Cr[self._index_update_f_and_cr] = cr_new self._index_update_f_and_cr += 1 if self._index_update_f_and_cr >= len(self._memory_F): self._index_update_f_and_cr = 1 self._successF = [] self._successCr = [] self._difference_fitness_success = [] def _initialize_parameters_for_epoch(self): """ Method to create f_table, cr_table and bests_to_select for the epoch. """ f_table = [] cr_table = [] bests_to_select = [] for _ in range(self._pop.size): ri = self._index_gen.generate(0, self._H) # We check here if we have TERMINAL value in memory_Cr if np.isnan(self._memory_Cr[ri]): cr = 0.0 else: cr = self._random_value_gen.generate_normal(self._memory_Cr[ri], 0.1, 0.0, 1.0) f = self._random_value_gen.generate_cauchy_greater_than_zero(self._memory_F[ri], 0.1, 0.0, 1.0) f_table.append(f) cr_table.append(cr) min_percentage = 2 / self.population_size max_percentage = 0.2 the_best_to_possible_select = self._random_value_gen.generate_count_from_percentage(self.population_size, min_percentage, max_percentage) bests_to_select.append(the_best_to_possible_select) return f_table, cr_table, bests_to_select
[docs] def next_epoch(self): f_table, cr_table, bests_to_select = self._initialize_parameters_for_epoch() mutant = self._mutate(self._pop, bests_to_select, f_table) trial = self._binomial_crossing.crossover_population(self._pop, mutant, cr_table) fix_boundary_constraints_with_parent(self._pop, trial, self.boundary_constraints_fun) trial.update_fitness_values(self._function.eval, self.parallel_processing) self._pop = self._selection(self._pop, trial, f_table, cr_table) self._archive = self._archive_reduction.reduce_archive(self._archive, self._archive_size, self.population_size) self._update_memory(self._successF, self._successCr, self._difference_fitness_success) self._update_population_size(self.nfe, self.nfe_max, self._start_population_size, self._min_pop_size)