Source code for DETAlgs.lshade_epsin

import copy
import math

import numpy as np
from typing import List

from detpy.DETAlgs.archive_reduction.archive_reduction import ArchiveReduction
from detpy.DETAlgs.base import BaseAlg
from detpy.DETAlgs.crossover_methods.binomial_crossover import BinomialCrossover
from detpy.DETAlgs.data.alg_data import LShadeEpsinData
from detpy.DETAlgs.mutation_methods.current_to_pbest_1 import MutationCurrentToPBest1
from detpy.DETAlgs.random.index_generator import IndexGenerator
from detpy.DETAlgs.random.random_value_generator import RandomValueGenerator
from detpy.models.enums.boundary_constrain import fix_boundary_constraints_with_parent
from detpy.models.enums.optimization import OptimizationType
from detpy.models.population import Population


[docs] class LShadeEpsin(BaseAlg): """ LShadeEpsin Links: https://ieeexplore.ieee.org/document/7744163 References: Awad, N. H., Ali, M. Z., Suganthan, P. N., & Reynolds, R. G. (2016). An ensemble sinusoidal parameter adaptation incorporated with L-SHADE for solving CEC2014 benchmark problems. In 2016 IEEE Congress on Evolutionary Computation (CEC) (pp. 2958–2965). 2016 IEEE Congress on Evolutionary Computation (CEC). IEEE. https://doi.org/10.1109/cec.2016.7744163 """ def __init__(self, params: LShadeEpsinData, db_conn=None, db_auto_write=False): super().__init__(LShadeEpsin.__name__, params, db_conn, db_auto_write) self._H = params.memory_size self._memory_F = np.full(self._H, 0.5) self._memory_Cr = np.full(self._H, 0.5) self._success_cr = [] self._success_f = [] self._p = params.best_member_percentage self._success_freg = [] self._difference_fitness_success = [] self._difference_fitness_success_freg = [] self._freq_memory = np.full(self._H, 0.5) self._archive = [] self._archive_size = self.population_size self._archive.extend(copy.deepcopy(self._pop.members)) self._min_pop_size = params.minimum_population_size self._start_population_size = self.population_size self._f_sin_freg = params.f_sin_freq self.population_size_reduction_strategy = params.population_reduction_strategy # Define a terminal value for memory_Cr. It indicates that no successful Cr was found in the epoch. self._TERMINAL = np.nan # We need this value for checking close to zero in update_memory self._EPSILON = 0.00001 self._k_index = 0 self._index_gen = IndexGenerator() self._random_value_gen = RandomValueGenerator() self._binomial_crossing = BinomialCrossover() self._archive_reduction = ArchiveReduction() self._local_search_done = False def _update_population_size(self, nfe: int, max_nfe: int, start_pop_size: int, min_pop_size: int): """ Calculate new population size using Linear Population Size Reduction (LPSR). Parameters: - nfe (int): The current function evaluations. - max_nfe (int): The total number of function evaluations. - start_pop_size (int): The initial population size. - min_pop_size (int): The minimum population size. """ new_size = self.population_size_reduction_strategy.get_new_population_size( nfe, max_nfe, start_pop_size, min_pop_size ) self._pop.resize(new_size) def _mutate(self, population: Population, the_best_to_select_table: List[int], f_table: List[float]) -> Population: """ Perform mutation step for the population in LSHADE-EpSin. Parameters: - population (Population): The population to mutate. - the_best_to_select_table (List[int]): List of the number of the best members to select. - f_table (List[float]): List of scaling factors for mutation. Returns: A new Population with mutated members. """ new_members = [] pa = np.concatenate((population.members, self._archive)) for i in range(population.size): r1 = self._index_gen.generate_unique(len(population.members), [i]) # Archive is included population and archive members r2 = self._index_gen.generate_unique(len(pa), [i, r1]) best_members = population.get_best_members(the_best_to_select_table[i]) best = best_members[np.random.randint(0, len(best_members))] mutant = MutationCurrentToPBest1.mutate( base_member=population.members[i], best_member=best, r1=population.members[r1], r2=pa[r2], f=f_table[i] ) new_members.append(mutant) return Population.with_new_members(population, new_members) def _selection(self, origin, modified, ftable, cr_table, freg_values): """ Perform selection operation for the population. Parameters: - origin (Population): The original population. - modified (Population): The modified population - ftable (List[float]): List of scaling factors for mutation. - cr_table (List[float]): List of crossover rates. - freg_values (dict): Dictionary of frequency values for members. Returns: A new population with the selected members. """ new_members = [] for i in range(origin.size): if (origin.optimization == OptimizationType.MINIMIZATION and origin.members[i] <= modified.members[i]) or \ (origin.optimization == OptimizationType.MAXIMIZATION and origin.members[i] >= modified.members[i]): new_members.append(copy.deepcopy(origin.members[i])) else: self._archive.append(copy.deepcopy(origin.members[i])) self._success_f.append(ftable[i]) self._success_cr.append(cr_table[i]) df = abs(modified.members[i].fitness_value - origin.members[i].fitness_value) self._difference_fitness_success.append(df) if i in freg_values: self._difference_fitness_success_freg.append(df) self._success_freg.append(ftable[i]) new_members.append(copy.deepcopy(modified.members[i])) return Population.with_new_members(origin, new_members) def _reset_success_metrics(self): """Reset success metrics after memory update.""" self._success_f = [] self._success_cr = [] self._success_freg = [] self._difference_fitness_success = [] self._difference_fitness_success_freg = [] def _update_memory(self, success_f, success_cr, success_freg, df, df_freg): """ Update the memory for the crossover rates, freg and scaling factors based on the success of the trial vectors. Parameters: - success_f (List[float]): List of scaling factors that led to better trial vectors. - success_cr (List[float]): List of crossover rates that led to better trial vectors. - success_freg (List[float]): List of freg values that led to better trial vectors. - df (List[float]): List of differences in objective function values (first part of evaluations) (|f(u_k, G) - f(x_k, G)|). - df_freg (List[float]): List of differences in objective function values for freg (second part of evaluations) (|f(u_k, G) - f(x_k, G)|). """ is_generation_lower_than_half_population = self.nfe < (self.nfe_max / 2) if is_generation_lower_than_half_population: # frequency update only in the first half of the generations if len(success_freg) > 0: total = np.sum(df_freg) weights = df_freg / total denominator = np.sum(weights * success_freg) if denominator > 0: freg_new = np.sum(weights * success_freg * success_freg) / np.sum(weights * success_freg) freg_new = np.clip(freg_new, 0, 1) random_index = np.random.randint(0, self._H) self._freq_memory[random_index] = freg_new if len(success_f) > 0 and len(success_cr) > 0: total = np.sum(df) weights = df / total if np.isclose(total, 0.0, atol=self._EPSILON): self._memory_Cr[self._k_index] = self._TERMINAL else: cr_new = np.sum(weights * success_cr * success_cr) / np.sum(weights * success_cr) cr_new = np.clip(cr_new, 0, 1) self._memory_Cr[self._k_index] = cr_new else: if len(success_f) > 0 and len(success_cr) > 0: total = np.sum(df) weights = df / total if np.isclose(total, 0.0, atol=self._EPSILON): self._memory_Cr[self._k_index] = self._TERMINAL else: cr_new = np.sum(weights * success_cr * success_cr) / np.sum(weights * success_cr) cr_new = np.clip(cr_new, 0, 1) self._memory_Cr[self._k_index] = cr_new f_new = np.sum(weights * success_f * success_f) / np.sum(weights * success_f) f_new = np.clip(f_new, 0, 1) self._memory_F[self._k_index] = f_new self._k_index = (self._k_index + 1) % self._H self._reset_success_metrics() def _initialize_parameters_for_epoch(self): """ Initialize the parameters for the next epoch of the LSHADE-EpSin algorithm. Parameters: f_table: List of scaling factors for mutation. cr_table: List of crossover rates. the_bests_to_select: List of the number of the best members to select because in crossover we need to select freg_values: Dictionary of frequency values for members. the best members from the population for one factor. Returns: - f_table (List[float]): List of scaling factors for mutation. - cr_table (List[float]): List of crossover rates. - bests_to_select (List[int]): List of the number of the best members to possibly select. - freg_values (dict): Dictionary of frequency values for members. """ f_table = [] cr_table = [] bests_to_select = [] freg_values = {} for idx in range(self._pop.size): ri = self._index_gen.generate(0, self._H) if np.isnan(self._memory_Cr[ri]): cr = 0.0 else: cr = self._random_value_gen.generate_normal(self._memory_Cr[ri], 0.1, 0.0, 1.0) is_lower_then_half_populations = self.nfe < (self.nfe_max / 2) if is_lower_then_half_populations: is_not_adaptive_sinusoidal_increasing = np.random.rand() < 0.5 if is_not_adaptive_sinusoidal_increasing: f = 0.5 * (math.sin(2 * math.pi * self._f_sin_freg * self.nfe + math.pi) * ( self.nfe_max - self.nfe) / self.nfe_max + 1.0) else: ri = np.random.randint(0, self._H) elem_freg_external_memory = self._freq_memory[ri] elem_cauchy = np.random.standard_cauchy() * 0.1 + elem_freg_external_memory elem_cauchy = np.clip(elem_cauchy, 0.0, 1.0) f = 0.5 * math.sin(2 * math.pi * elem_cauchy * self.nfe) * ( self.nfe / self.nfe_max) + 1 freg_values[idx] = f else: f = self._random_value_gen.generate_cauchy_greater_than_zero(self._memory_F[ri], 0.1, 0.0, 1.0) f_table.append(f) cr_table.append(cr) best_member_count = int(self._p * self.population_size) bests_to_select.append(best_member_count) return f_table, cr_table, bests_to_select, freg_values def _perform_local_search(self): """ Perform the Gaussian Walk local search phase as defined in the LSHADE-EpSin algorithm This procedure is executed once when the population size becomes very small (typically ≤ 20 individuals) in the later stage of evolution. It generates a small set of candidate solutions and refines them around the current best individual using a Gaussian-based local search mechanism. This approach allows the algorithm to exploit the best-found solution more precisely by performing small stochastic moves toward the global optimum, enhancing convergence when diversity is low. """ num_candidates = 10 G_ls = 250 candidates = Population( lb=self._pop.lb, ub=self._pop.ub, arg_num=self._pop.arg_num, size=num_candidates, optimization=self._pop.optimization ) candidates.generate_population() candidates.update_fitness_values(self._function.eval, self.parallel_processing) best_individual = min(candidates.members, key=lambda x: x.fitness_value) \ if self._pop.optimization == OptimizationType.MINIMIZATION \ else max(candidates.members, key=lambda x: x.fitness_value) for gen in range(G_ls): new_candidates = [] for i, candidate in enumerate(candidates.members): x = [chromosome.real_value for chromosome in candidate.chromosomes] x_best = [chromosome.real_value for chromosome in best_individual.chromosomes] G = gen + 1 sigma = np.abs((np.log(G) / G) * (np.array(x_best) - np.array(x))) epsilon = np.random.rand(self._pop.arg_num) epsilon_hat = np.random.rand(self._pop.arg_num) x_new = np.array(x_best) + epsilon * (np.array(x) - np.array(x_best)) + epsilon_hat * sigma x_new = np.clip(x_new, self._pop.lb, self._pop.ub) new_candidate = copy.deepcopy(candidate) for j, value in enumerate(x_new): new_candidate.chromosomes[j].real_value = float(value) new_candidates.append(new_candidate) candidates.update_fitness_values(self._function.eval, self.parallel_processing) for i, candidate in enumerate(new_candidates): new_candidate = new_candidates[i] if ( self._pop.optimization == OptimizationType.MINIMIZATION and new_candidate.fitness_value < candidate.fitness_value) or \ ( self._pop.optimization == OptimizationType.MAXIMIZATION and new_candidate.fitness_value > candidate.fitness_value): candidates.members[i] = new_candidate candidates.update_fitness_values(self._function.eval, self.parallel_processing) worst_indices = sorted(range(len(self._pop.members)), key=lambda i: self._pop.members[i].fitness_value, reverse=(self._pop.optimization == OptimizationType.MINIMIZATION))[:num_candidates] for i, idx in enumerate(worst_indices): if (self._pop.optimization == OptimizationType.MINIMIZATION and candidates.members[i].fitness_value < self._pop.members[idx].fitness_value) or \ (self._pop.optimization == OptimizationType.MAXIMIZATION and candidates.members[i].fitness_value > self._pop.members[idx].fitness_value): self._pop.members[idx] = candidates.members[i]
[docs] def next_epoch(self): """ Perform the next epoch of the LSHADE-EpSin algorithm. """ f_table, cr_table, bests_to_select, freg_values = self._initialize_parameters_for_epoch() mutant = self._mutate(self._pop, bests_to_select, f_table) trial = self._binomial_crossing.crossover_population(self._pop, mutant, cr_table) fix_boundary_constraints_with_parent(self._pop, trial, self.boundary_constraints_fun) trial.update_fitness_values(self._function.eval, self.parallel_processing) self._pop = self._selection(self._pop, trial, f_table, cr_table, freg_values) self._archive = self._archive_reduction.reduce_archive(self._archive, self._archive_size, self.population_size) self._update_memory(self._success_f, self._success_cr, self._success_freg, self._difference_fitness_success, self._difference_fitness_success_freg) self._update_population_size(self.nfe, self.nfe_max, self._start_population_size, self._min_pop_size) if self._pop.size <= 20 and not self._local_search_done: self._local_search_done = True self._perform_local_search()