Source code for DETAlgs.lshadersp

import copy
from typing import List

import numpy as np
from detpy.DETAlgs.base import BaseAlg
from detpy.DETAlgs.crossover_methods.binomial_crossover import BinomialCrossover
from detpy.DETAlgs.data.alg_data import LSHADERSPData

from random import randint

from detpy.DETAlgs.methods.methods_lshadersp import archive_reduction, rank_selection
from detpy.DETAlgs.mutation_methods.current_to_pbest_r import MutationCurrentToPBestR
from detpy.DETAlgs.random.random_value_generator import RandomValueGenerator
from detpy.math_functions.lehmer_mean import LehmerMean
from detpy.models.enums.boundary_constrain import fix_boundary_constraints_with_parent
from detpy.models.enums.optimization import OptimizationType
from detpy.models.member import Member
from detpy.models.population import Population


[docs] class LSHADERSP(BaseAlg): """ L-SHADE-RSP: LSHADE Algorithm with a Rank-based Selective Pressure Strategy (RSP) Links: https://www.scitepress.org/papers/2018/68525/68525.pdf References: Shakhnaz Akhmedova, Vladimir Stanovov and Eugene Semenkin (2018). Research on In Proceedings of the 15th International Conference on Informatics in Control, Automation and Robotics (ICINCO 2018) - Volume 1, pages 149-155 """ def __init__(self, params: LSHADERSPData, db_conn=None, db_auto_write=False): super().__init__(LSHADERSP.__name__, params, db_conn, db_auto_write) self._k = params.scaling_factor_for_rank_selection # Scaling factor for rank selection self._H = params.memory_size # Memory size for f and cr adaptation self._memory_F = np.full(self._H, 0.3) # Initial memory for F self._memory_Cr = np.full(self._H, 0.8) # Initial memory for Cr self._memory_F[self._H - 1] = 0.9 # One cell of the memory for F must be set to 0.9 self._memory_Cr[self._H - 1] = 0.9 # One cell of the memory for Cr must be set to 0.9 self._population_size_reduction_strategy = params.population_reduction_strategy self._min_pop_size = params.minimum_population_size # Minimal population size self._start_population_size = self.population_size self._archive_size = self.population_size # Size of the archive self._archive = [] # Archive for storing the members from old populations self._lehmer_mean_func = LehmerMean() # Class for Lehmer mean calculation self._success_f = [] # List of successful F values self._success_cr = [] # List of successful Cr values self._f = np.random.standard_cauchy() * 0.1 + np.random.choice(self._memory_F) self._fw = 0.7 * self._f if self.nfe < 0.2 * self.nfe_max else ( 0.8 * self._f if self.nfe < 0.4 * self.nfe_max else 1.2 * self._f) self._difference_fitness_success = [] # Define a terminal value for memory_Cr. It indicates that no successful Cr was found in the epoch. self._TERMINAL = np.nan # We need this value for checking close to zero in update_memory self._EPSILON = 0.00001 self._binomial_crossing = BinomialCrossover() self._random_value_gen = RandomValueGenerator() def _adapt_parameters(self, fitness_improvement: List[float]): """ Update the memory for F and Cr based on the fitness improvement of the crossover and mutation steps. Parameters: - fitness_improvement: List of fitness improvement values for successful individuals. """ epsilon = 1e-10 # Small value to avoid division by zero total_improvement = np.sum(fitness_improvement) if total_improvement > epsilon: # Compute weights based on fitness improvement weights = fitness_improvement / total_improvement # Randomly select a memory index to update # Ensure one cell in the memory always holds the fixed value 0.9 so we have index -1 r = np.random.randint(0, self._H - 1) # Compute new F value as the Lehmer mean of successful F values if len(self._success_f) > 0: new_f = self._lehmer_mean_func.evaluate(self._success_f, weights, 2) new_f = np.clip(new_f, 0, 1) # Clip to [0, 1] # Update memory_F self._memory_F[r] = (self._memory_F[r] + new_f) / 2 # Mean of old and new F if np.isclose(total_improvement, 0.0, atol=self._EPSILON): self._memory_Cr[r] = self._TERMINAL else: new_cr = np.sum(weights * self._success_cr) new_cr = np.clip(new_cr, 0, 1) # Clip to [0, 1] # Update memory_Cr self._memory_Cr[r] = (self._memory_Cr[r] + new_cr) / 2 # Mean of old and new Cr # Clear the lists of successful F and Cr values for the next generation self._difference_fitness_success = [] self._success_cr = [] self._success_f = [] def _get_best_members(self, best_members: List[Member], archive: List[Member]) -> List[Member]: """ Combine and sort the best members from the current population and the archive, then return the top members. Parameters: - best_members (List[Member]): The best members from the current population. - archive (List[Member]): The archive of old population members. Returns: - List[Member]: The top combined members from best_members and archive. """ size_best_members_to_select = len(best_members) combined = np.concatenate((best_members, archive)) if self._pop.optimization == OptimizationType.MINIMIZATION: # For minimization, sort the combined array by the smallest fitness value combined = sorted(combined, key=lambda member: member.fitness_value) else: # For maximization, sort the combined array by the largest fitness value combined = sorted(combined, key=lambda member: member.fitness_value, reverse=True) return combined[:size_best_members_to_select] def _mutate(self, population: Population, nfe: int, nfe_max: int, pop_size: int, f_table: List[float], fw_table: List[float] ) -> Population: """ Perform mutation step for the population. Parameters: - population (Population): The population to mutate. - nfe (int): The current number of function evaluations. - nfe_max (int): The maximum number of function evaluations. - pop_size (int): The size of the population. - f_table (List[float]): A list of scaling factors F for the current epoch. - fw_table (List[float]): A list of dynamic scaling factors Fw for the current epoch. Returns: A new Population with mutated members. """ new_members = [] for i in range(population.size): members = np.append(population.members, self._archive) r1, r2 = rank_selection(members, self._k, self.optimization_type) p = 0.085 + (0.085 * nfe) / nfe_max how_many_best_to_select = p * pop_size how_many_best_to_select = max(1, how_many_best_to_select) # at least one best member also for pbest it # is require minimum 4 members for selection: current, best, r1, r2 best_members = population.get_best_members(int(how_many_best_to_select)) all_best_members = self._get_best_members(best_members, self._archive) # get best members from the population and archive rand_index = randint(0, len(all_best_members) - 1) # First member in argument is the actual member, second is from the best member from the population and archive, # third and fourth are random members from the population and archive using the rank selection new_member = MutationCurrentToPBestR.mutate(population.members[i], all_best_members[rand_index], members[r1], members[r2], f_table[i], fw_table[i]) new_members.append(new_member) return Population.with_new_members(population, new_members) def _selection(self, origin_population: Population, modified_population: Population, cr_table: List[float], f_table: List[float]): """ Perform selection operation for the population. Parameters: - origin_population (Population): The original population. - modified_population (Population): The modified population - cr_table (List[float]): A list of crossover rates Cr for the current epoch. - f_table (List[float]): A list of scaling factors F for the current epoch. Returns: A new population with the selected members. """ optimization = origin_population.optimization new_members = [] for i in range(origin_population.size): if optimization == OptimizationType.MINIMIZATION: if origin_population.members[i] < modified_population.members[i]: new_members.append(copy.deepcopy(origin_population.members[i])) else: self._archive.append(copy.deepcopy(origin_population.members[i])) new_members.append(copy.deepcopy(modified_population.members[i])) self._success_f.append(f_table[i]) self._success_cr.append(cr_table[i]) self._difference_fitness_success.append( origin_population.members[i].fitness_value - modified_population.members[i].fitness_value) elif optimization == OptimizationType.MAXIMIZATION: if origin_population.members[i] > modified_population.members[i]: new_members.append(copy.deepcopy(origin_population.members[i])) else: self._archive.append(copy.deepcopy(origin_population.members[i])) new_members.append(copy.deepcopy(modified_population.members[i])) self._success_f.append(f_table[i]) self._success_cr.append(cr_table[i]) self._difference_fitness_success.append( modified_population.members[i].fitness_value - origin_population.members[i].fitness_value) return Population.with_new_members(origin_population, new_members) def _update_population_size(self, nfe: int, total_nfe: int, start_pop_size: int, min_pop_size: int): """ Calculate new population size using Linear Population Size Reduction (LPSR). Parameters: - start_pop_size (int): The initial population size. - nfe (int): The current nfe. - total_nfe (int): The total number of nfe. - min_pop_size (int): The minimum population size. """ new_size = self._population_size_reduction_strategy.get_new_population_size( nfe, total_nfe, start_pop_size, min_pop_size ) self._pop.resize(new_size) # Update archive size proportionally self._archive_size = new_size def _calculate_factors_for_epoch(self, pop_size): """ Calculate the mutation parameters (F, Cr, Fw) for the current epoch based on the LSHADE-RSP algorithm. Parameters: - pop_size (int): The population size. """ f_table = [] cr_table = [] fw_table = [] for i in range(pop_size): ri = np.random.randint(0, self._H) mean_f = self._memory_F[ri] mean_cr = self._memory_Cr[ri] if np.isnan(self._memory_Cr[ri]): cr = 0.0 else: # Generate Cr from normal distribution cr = self._random_value_gen.generate_normal(mean_cr, 0.1, 0.0, 1.0) # Generate F from Cauchy distribution (ensure F > 0) f = self._random_value_gen.generate_cauchy_greater_than_zero(mean_f, 0.1, 0.0, 1.0) # Constrain F based on NFE if self.nfe < 0.6 * self.nfe_max: f = min(f, 0.7) # F <= 0.7 for the first 60% of evaluations else: f = min(f, 1.0) # F <= 1.0 for the remaining evaluations f_table.append(f) cr_table.append(cr) # Set Fw based on the current NFE if self.nfe < 0.2 * self.nfe_max: fw = 0.7 * f elif self.nfe < 0.4 * self.nfe_max: fw = 0.8 * f else: fw = 1.2 * f fw_table.append(fw) return f_table, cr_table, fw_table
[docs] def next_epoch(self): """ Perform the next epoch of the L-SHADE-RSP algorithm. """ f_table, cr_table, fw_table = self._calculate_factors_for_epoch(self._pop.size) mutant = self._mutate(self._pop, self.nfe, self.nfe_max, self._pop.size, f_table, fw_table) trial = self._binomial_crossing.crossover_population(self._pop, mutant, cr_table) fix_boundary_constraints_with_parent(self._pop, trial, self.boundary_constraints_fun) trial.update_fitness_values(self._function.eval, self.parallel_processing) new_pop = self._selection(self._pop, trial, cr_table, f_table) archive_reduction(self._archive, self._archive_size, self.optimization_type) # Override the entire population with the new population self._pop = new_pop self._adapt_parameters(self._difference_fitness_success) self._update_population_size(self.nfe, self.nfe_max, self._start_population_size, self._min_pop_size)