Source code for mealpy.optimizer

#!/usr/bin/env python
# Created by "Thieu" at 08:58, 16/03/2020 ----------%
#       Email: nguyenthieu2102@gmail.com            %
#       Github: https://github.com/thieu1995        %
# --------------------------------------------------%

import numpy as np
from typing import List, Union, Tuple, Dict
from mealpy.utils.agent import Agent
from mealpy.utils.problem import Problem
from math import gamma
from mealpy.utils.history import History
from mealpy.utils.target import Target
from mealpy.utils.termination import Termination
from mealpy.utils.logger import Logger
from mealpy.utils.validator import Validator
import concurrent.futures as parallel
from functools import partial
import os
import time


[docs]class Optimizer: """ The base class of all algorithms. All methods in this class will be inherited Notes ~~~~~ + The function solve() is the most important method, trained the model + The parallel (multithreading or multiprocessing) is used in method: generate_population(), update_target_for_population() + The general format of: + population = [agent_1, agent_2, ..., agent_N] + agent = [solution, target] + target = [fitness value, objective_list] + objective_list = [obj_1, obj_2, ..., obj_M] """ EPSILON = 10E-10 SUPPORTED_MODES = ["process", "thread", "swarm", "single"] AVAILABLE_MODES = ["process", "thread", "swarm"] PARALLEL_MODES = ["process", "thread"] SUPPORTED_ARRAYS = [list, tuple, np.ndarray] def __init__(self, **kwargs): super(Optimizer, self).__init__() self.epoch, self.pop_size = None, None self.mode, self.n_workers, self.name = None, None, None self.pop, self.g_best, self.g_worst = None, Agent(), None self.problem, self.logger, self.history = None, None, None self.__set_keyword_arguments(kwargs) self.validator = Validator(log_to="console", log_file=None) if self.name is None: self.name = self.__class__.__name__ self.sort_flag = False self.nfe_counter = -1 # The first one is tested in Problem class self.parameters, self.params_name_ordered = {}, None self.is_parallelizable = True def __set_keyword_arguments(self, kwargs): for key, value in kwargs.items(): setattr(self, key, value)
[docs] def set_parameters(self, parameters: Union[List, Tuple, Dict]) -> None: """ Set the parameters for current optimizer. if paras is a list of parameter's name, then it will set the default value in optimizer as current parameters if paras is a dict of parameter's name and value, then it will override the current parameters Args: parameters: The parameters """ if type(parameters) in (list, tuple): self.params_name_ordered = tuple(parameters) self.parameters = {} for name in parameters: self.parameters[name] = self.__dict__[name] if type(parameters) is dict: valid_para_names = set(self.parameters.keys()) new_para_names = set(parameters.keys()) if new_para_names.issubset(valid_para_names): for key, value in parameters.items(): setattr(self, key, value) self.parameters[key] = value else: raise ValueError(f"Invalid input parameters: {new_para_names} for {self.get_name()} optimizer. " f"Valid parameters are: {valid_para_names}.")
[docs] def get_parameters(self) -> Dict: """ Get parameters of optimizer. """ return self.parameters
[docs] def get_attributes(self) -> Dict: """ Get all attributes in optimizer. """ return self.__dict__
[docs] def get_name(self) -> str: """ Get name of the optimizer """ return self.name
def __str__(self): temp = "" for key in self.params_name_ordered: temp += f"{key}={self.parameters[key]}, " temp = temp[:-2] return f"{self.__class__.__name__}({temp})"
[docs] def initialize_variables(self): pass
[docs] def before_initialization(self, starting_solutions: Union[List, Tuple, np.ndarray] = None) -> None: """ Args: starting_solutions: The starting solutions (not recommended) """ if starting_solutions is None: pass elif type(starting_solutions) in self.SUPPORTED_ARRAYS and len(starting_solutions) == self.pop_size: if type(starting_solutions[0]) in self.SUPPORTED_ARRAYS and len(starting_solutions[0]) == self.problem.n_dims: self.pop = [self.generate_agent(solution) for solution in starting_solutions] else: raise ValueError("Invalid starting_solutions. It should be a list of positions or 2D matrix of positions only.") else: raise ValueError("Invalid starting_solutions. It should be a list/2D matrix of positions with same length as pop_size.")
[docs] def initialization(self) -> None: if self.pop is None: self.pop = self.generate_population(self.pop_size)
[docs] def after_initialization(self) -> None: # The initial population is sorted or not depended on algorithm's strategy pop_temp, best, worst = self.get_special_agents(self.pop, n_best=1, n_worst=1, minmax=self.problem.minmax) self.g_best, self.g_worst = best[0], worst[0] if self.sort_flag: self.pop = pop_temp ## Store initial best and worst solutions self.history.store_initial_best_worst(self.g_best, self.g_worst)
[docs] def before_main_loop(self): pass
[docs] def evolve(self, epoch: int) -> None: pass
[docs] def check_problem(self, problem, seed) -> None: if isinstance(problem, Problem): problem.set_seed(seed) self.problem = problem elif type(problem) == dict: problem["seed"] = seed self.problem = Problem(**problem) else: raise ValueError("problem needs to be a dict or an instance of Problem class.") self.generator = np.random.default_rng(seed) self.logger = Logger(self.problem.log_to, log_file=self.problem.log_file).create_logger(name=f"{self.__module__}.{self.__class__.__name__}") self.logger.info(self.problem.msg) self.history = History(log_to=self.problem.log_to, log_file=self.problem.log_file) self.pop, self.g_best, self.g_worst = None, None, None
[docs] def check_mode_and_workers(self, mode, n_workers): self.mode = self.validator.check_str("mode", mode, self.SUPPORTED_MODES) if self.mode in self.PARALLEL_MODES: if not self.is_parallelizable: self.logger.warning(f"{self.get_name()} doesn't support parallelization. The default mode 'single' is activated.") self.mode = "single" elif n_workers is not None: if self.mode == "process": self.n_workers = self.validator.check_int("n_workers", n_workers, [2, min(61, os.cpu_count() - 1)]) if self.mode == "thread": self.n_workers = self.validator.check_int("n_workers", n_workers, [2, min(32, os.cpu_count() + 4)]) else: self.logger.warning(f"The parallel mode: {self.mode} is selected. But n_workers is not set. The default n_workers = 4 is used.") self.n_workers = 4
[docs] def check_termination(self, mode="start", termination=None, epoch=None): if mode == "start": self.termination = termination if termination is not None: if isinstance(termination, Termination): self.termination = termination elif type(termination) == dict: self.termination = Termination(log_to=self.problem.log_to, log_file=self.problem.log_file, **termination) else: raise ValueError("Termination needs to be a dict or an instance of Termination class.") self.nfe_counter = 0 self.termination.set_start_values(0, self.nfe_counter, time.perf_counter(), 0) else: finished = False if self.termination is not None: es = self.history.get_global_repeated_times(self.termination.epsilon) finished = self.termination.should_terminate(epoch, self.nfe_counter, time.perf_counter(), es) if finished: self.logger.warning(self.termination.message) return finished
[docs] def solve(self, problem: Union[Dict, Problem] = None, mode: str = 'single', n_workers: int = None, termination: Union[Dict, Termination] = None, starting_solutions: Union[List, np.ndarray, Tuple] = None, seed: int = None) -> Agent: """ Args: problem: an instance of Problem class or a dictionary mode: Parallel: 'process', 'thread'; Sequential: 'swarm', 'single'. * 'process': The parallel mode with multiple cores run the tasks * 'thread': The parallel mode with multiple threads run the tasks * 'swarm': The sequential mode that no effect on updating phase of other agents * 'single': The sequential mode that effect on updating phase of other agents, this is default mode n_workers: The number of workers (cores or threads) to do the tasks (effect only on parallel mode) termination: The termination dictionary or an instance of Termination class starting_solutions: List or 2D matrix (numpy array) of starting positions with length equal pop_size parameter seed: seed for random number generation needed to be *explicitly* set to int value Returns: g_best: g_best, the best found agent, that hold the best solution and the best target. Access by: .g_best.solution, .g_best.target """ self.check_problem(problem, seed) self.check_mode_and_workers(mode, n_workers) self.check_termination("start", termination, None) self.initialize_variables() self.before_initialization(starting_solutions) self.initialization() self.after_initialization() self.before_main_loop() for epoch in range(1, self.epoch + 1): time_epoch = time.perf_counter() ## Evolve method will be called in child class self.evolve(epoch) # Update global best solution, the population is sorted or not depended on algorithm's strategy pop_temp, self.g_best = self.update_global_best_agent(self.pop) if self.sort_flag: self.pop = pop_temp time_epoch = time.perf_counter() - time_epoch self.track_optimize_step(self.pop, epoch, time_epoch) if self.check_termination("end", None, epoch): break self.track_optimize_process() return self.g_best
[docs] def track_optimize_step(self, pop: List[Agent] = None, epoch: int = None, runtime: float = None) -> None: """ Save some historical data and print out the detailed information of training process in each epoch Args: pop: the current population epoch: current iteration runtime: the runtime for current iteration """ ## Save history data if self.problem.save_population: self.history.list_population.append(Optimizer.duplicate_pop(pop)) self.history.list_epoch_time.append(runtime) self.history.list_global_best_fit.append(self.history.list_global_best[-1].target.fitness) self.history.list_current_best_fit.append(self.history.list_current_best[-1].target.fitness) # Save the exploration and exploitation data for later usage pos_matrix = np.array([agent.solution for agent in pop]) div = np.mean(np.abs(np.median(pos_matrix, axis=0) - pos_matrix), axis=0) self.history.list_diversity.append(np.mean(div, axis=0)) ## Print epoch self.logger.info(f">>>Problem: {self.problem.name}, Epoch: {epoch}, Current best: {self.history.list_current_best[-1].target.fitness}, " f"Global best: {self.history.list_global_best[-1].target.fitness}, Runtime: {runtime:.5f} seconds")
[docs] def track_optimize_process(self) -> None: """ Save some historical data after training process finished """ self.history.epoch = len(self.history.list_diversity) div_max = np.max(self.history.list_diversity) self.history.list_exploration = 100 * (np.array(self.history.list_diversity) / div_max) self.history.list_exploitation = 100 - self.history.list_exploration self.history.list_global_best = self.history.list_global_best[1:] self.history.list_current_best = self.history.list_current_best[1:] self.history.list_global_worst = self.history.list_global_worst[1:] self.history.list_current_worst = self.history.list_current_worst[1:]
[docs] def generate_empty_agent(self, solution: np.ndarray = None) -> Agent: """ Generate new agent with solution Args: solution (np.ndarray): The solution """ if solution is None: solution = self.problem.generate_solution(encoded=True) return Agent(solution=solution)
[docs] def generate_agent(self, solution: np.ndarray = None) -> Agent: """ Generate new agent with full information Args: solution (np.ndarray): The solution """ agent = self.generate_empty_agent(solution) agent.target = self.get_target(agent.solution) return agent
[docs] def generate_population(self, pop_size: int = None) -> List[Agent]: """ Args: pop_size (int): number of solutions Returns: list: population or list of solutions/agents """ if pop_size is None: pop_size = self.pop_size pop = [] if self.mode == "thread": with parallel.ThreadPoolExecutor(self.n_workers) as executor: list_executors = [executor.submit(self.generate_agent) for _ in range(pop_size)] # This method yield the result everytime a thread finished their job (not by order) for f in parallel.as_completed(list_executors): pop.append(f.result()) elif self.mode == "process": with parallel.ProcessPoolExecutor(self.n_workers) as executor: list_executors = [executor.submit(self.generate_agent) for _ in range(pop_size)] # This method yield the result everytime a cpu finished their job (not by order). for f in parallel.as_completed(list_executors): pop.append(f.result()) else: pop = [self.generate_agent() for _ in range(0, pop_size)] return pop
[docs] def amend_solution(self, solution: np.ndarray) -> np.ndarray: """ This function is based on optimizer's strategy. In each optimizer, this function can be overridden Args: solution: The position Returns: The valid solution based on optimizer's strategy """ return solution
[docs] def correct_solution(self, solution: np.ndarray) -> np.ndarray: """ This function is based on optimizer's strategy and problem-specific condition DO NOT override this function Args: solution: The position Returns: The correct solution that can be used to calculate target """ solution = self.amend_solution(solution) return self.problem.correct_solution(solution)
[docs] def update_target_for_population(self, pop: List[Agent] = None) -> List[Agent]: """ Update target for the input population Args: pop: the population of agents Returns: list: population with updated target value """ pos_list = [agent.solution for agent in pop] if self.mode == "thread": with parallel.ThreadPoolExecutor(self.n_workers) as executor: # Return result as original order, not the future object list_results = executor.map(partial(self.get_target, counted=False), pos_list) for idx, target in enumerate(list_results): pop[idx].target = target elif self.mode == "process": with parallel.ProcessPoolExecutor(self.n_workers) as executor: # Return result as original order, not the future object list_results = executor.map(partial(self.get_target, counted=False), pos_list) for idx, target in enumerate(list_results): pop[idx].target = target elif self.mode == "swarm": for idx, pos in enumerate(pos_list): pop[idx].target = self.get_target(pos, counted=False) else: return pop self.nfe_counter += len(pop) return pop
[docs] def get_target(self, solution: np.ndarray, counted: bool = True) -> Target: """ Get target value Args: solution: The real-value solution counted: Indicating the number of function evaluations is increasing or not Returns: The target value """ if counted: self.nfe_counter += 1 return self.problem.get_target(solution)
[docs] @staticmethod def compare_target(target_x: Target, target_y: Target, minmax: str = "min") -> bool: if minmax == "min": return True if target_x.fitness < target_y.fitness else False else: return False if target_x.fitness < target_y.fitness else True
[docs] @staticmethod def compare_fitness(fitness_x: Union[float, int], fitness_y: Union[float, int], minmax: str = "min") -> bool: if minmax == "min": return True if fitness_x < fitness_y else False else: return False if fitness_x < fitness_y else True
[docs] @staticmethod def duplicate_pop(pop: List[Agent]) -> List[Agent]: return [agent.copy() for agent in pop]
[docs] @staticmethod def get_sorted_population(pop: List[Agent], minmax: str = "min") -> List[Agent]: """ Get sorted population based on type (minmax) of problem Args: pop: The population minmax: The type of the problem Returns: Sorted population (1st agent is the best, last agent is the worst """ if minmax == "min": return sorted(pop, key=lambda agent: agent.target.fitness) else: return sorted(pop, key=lambda agent: agent.target.fitness, reverse=True)
[docs] @staticmethod def get_best_agent(pop: List[Agent], minmax: str = "min") -> Agent: """ Args: pop: The population of agents minmax: The type of problem Returns: The best agent """ pop = Optimizer.get_sorted_population(pop, minmax) return pop[0].copy()
[docs] @staticmethod def get_index_best(pop: List[Agent], minmax: str = "min") -> int: fit_list = np.array([agent.target.fitness for agent in pop]) if minmax == "min": return np.argmin(fit_list) else: return np.argmax(fit_list)
[docs] @staticmethod def get_worst_agent(pop: List[Agent], minmax: str = "min") -> Agent: """ Args: pop: The population of agents minmax: The type of problem Returns: The worst agent """ pop = Optimizer.get_sorted_population(pop, minmax) return pop[-1].copy()
[docs] @staticmethod def get_special_agents(pop: List[Agent] = None, n_best: int = 3, n_worst: int = 3, minmax: str = "min") -> Tuple[List[Agent], Union[List[Agent], None], Union[List[Agent], None]]: """ Get special agents include sorted population, n1 best agents, n2 worst agents Args: pop: The population n_best: Top n1 best agents, default n1=3, good level reduction n_worst: Top n2 worst agents, default n2=3, worst level reduction minmax: The problem type Returns: The sorted_population, n1 best agents and n2 worst agents """ pop = Optimizer.get_sorted_population(pop, minmax) if n_best is None: if n_worst is None: return pop, None, None else: return pop, None, [agent.copy() for agent in pop[::-1][:n_worst]] else: if n_worst is None: return pop, [agent.copy() for agent in pop[:n_best]], None else: return pop, [agent.copy() for agent in pop[:n_best]], [agent.copy() for agent in pop[::-1][:n_worst]]
[docs] @staticmethod def get_special_fitness(pop: List[Agent] = None, minmax: str = "min") -> Tuple[Union[float, np.ndarray], float, float]: """ Get special target include the total fitness, the best fitness, and the worst fitness Args: pop: The population minmax: The problem type Returns: The total fitness, the best fitness, and the worst fitness """ total_fitness = np.sum([agent.target.fitness for agent in pop]) pop = Optimizer.get_sorted_population(pop, minmax) return total_fitness, pop[0].target.fitness, pop[-1].target.fitness
[docs] @staticmethod def get_better_agent(agent_x: Agent, agent_y: Agent, minmax: str = "min", reverse: bool = False) -> Agent: """ Args: agent_x: First agent agent_y: Second agent minmax: The problem type reverse: Reverse the minmax Returns: The better agent based on fitness """ minmax_dict = {"min": 0, "max": 1} idx = minmax_dict[minmax] if reverse: idx = 1 - idx if idx == 0: return agent_x.copy() if agent_x.target.fitness < agent_y.target.fitness else agent_y.copy() else: return agent_y.copy() if agent_x.target.fitness < agent_y.target.fitness else agent_x.copy()
### Survivor Selection
[docs] @staticmethod def greedy_selection_population(pop_old: List[Agent] = None, pop_new: List[Agent] = None, minmax: str = "min") -> List[Agent]: """ Args: pop_old: The current population pop_new: The next population minmax: The problem type Returns: The new population with better solutions """ len_old, len_new = len(pop_old), len(pop_new) if len_old != len_new: raise ValueError("Greedy selection of two population with different length.") if minmax == "min": return [pop_new[idx] if pop_new[idx].target.fitness < pop_old[idx].target.fitness else pop_old[idx] for idx in range(len_old)] else: return [pop_new[idx] if pop_new[idx].target.fitness > pop_old[idx].target.fitness else pop_old[idx] for idx in range(len_old)]
[docs] @staticmethod def get_sorted_and_trimmed_population(pop: List[Agent] = None, pop_size: int = None, minmax: str = "min") -> List[Agent]: """ Args: pop: The population pop_size: The number of selected agents minmax: The problem type Returns: The sorted and trimmed population with pop_size size """ pop = Optimizer.get_sorted_population(pop, minmax) return pop[:pop_size]
[docs] def update_global_best_agent(self, pop: List[Agent], save: bool = True) -> Union[List, Tuple]: """ Update global best and current best solutions in history object. Also update global worst and current worst solutions in history object. Args: pop (list): The population of pop_size individuals save (bool): True if you want to add new current/global best to history, False if you just want to update current/global best Returns: list: Sorted population and the global best solution """ sorted_pop = self.get_sorted_population(pop, self.problem.minmax) c_best, c_worst = sorted_pop[0], sorted_pop[-1] if save: ## Save current best self.history.list_current_best.append(c_best) better = self.get_better_agent(c_best, self.history.list_global_best[-1], self.problem.minmax) self.history.list_global_best.append(better) ## Save current worst self.history.list_current_worst.append(c_worst) worse = self.get_better_agent(c_worst, self.history.list_global_worst[-1], self.problem.minmax, reverse=True) self.history.list_global_worst.append(worse) return sorted_pop, better else: ## Handle current best local_better = self.get_better_agent(c_best, self.history.list_current_best[-1], self.problem.minmax) self.history.list_current_best[-1] = local_better global_better = self.get_better_agent(c_best, self.history.list_global_best[-1], self.problem.minmax) self.history.list_global_best[-1] = global_better ## Handle current worst local_worst = self.get_better_agent(c_worst, self.history.list_current_worst[-1], self.problem.minmax, reverse=True) self.history.list_current_worst[-1] = local_worst global_worst = self.get_better_agent(c_worst, self.history.list_global_worst[-1], self.problem.minmax, reverse=True) self.history.list_global_worst[-1] = global_worst return sorted_pop, global_better
## Selection techniques
[docs] def get_index_roulette_wheel_selection(self, list_fitness: np.array): """ This method can handle min/max problem, and negative or positive fitness value. Args: list_fitness (nd.array): 1-D numpy array Returns: int: Index of selected solution """ if type(list_fitness) in [list, tuple, np.ndarray]: list_fitness = np.array(list_fitness).flatten() if list_fitness.ptp() == 0: return int(self.generator.integers(0, len(list_fitness))) if np.any(list_fitness) < 0: list_fitness = list_fitness - np.min(list_fitness) final_fitness = list_fitness if self.problem.minmax == "min": final_fitness = np.max(list_fitness) - list_fitness prob = final_fitness / np.sum(final_fitness) return int(self.generator.choice(range(0, len(list_fitness)), p=prob))
[docs] def get_index_kway_tournament_selection(self, pop: List = None, k_way: float = 0.2, output: int = 2, reverse: bool = False) -> List: """ Args: pop: The population k_way (float/int): The percent or number of solutions are randomized pick output (int): The number of outputs reverse (bool): set True when finding the worst fitness Returns: list: List of the selected indexes """ if 0 < k_way < 1: k_way = int(k_way * len(pop)) list_id = self.generator.choice(range(len(pop)), k_way, replace=False) list_parents = [[idx, pop[idx].target.fitness] for idx in list_id] if self.problem.minmax == "min": list_parents = sorted(list_parents, key=lambda agent: agent[1]) else: list_parents = sorted(list_parents, key=lambda agent: agent[1], reverse=True) if reverse: return [parent[0] for parent in list_parents[-output:]] return [parent[0] for parent in list_parents[:output]]
[docs] def get_levy_flight_step(self, beta: float = 1.0, multiplier: float = 0.001, size: Union[List, Tuple, np.ndarray] = None, case: int = 0) -> Union[float, List, np.ndarray]: """ Get the Levy-flight step size Args: beta (float): Should be in range [0, 2]. * 0-1: small range --> exploit * 1-2: large range --> explore multiplier (float): default = 0.001 size (tuple, list): size of levy-flight steps, for example: (3, 2), 5, (4, ) case (int): Should be one of these value [0, 1, -1]. * 0: return multiplier * s * self.generator.uniform() * 1: return multiplier * s * self.generator.normal(0, 1) * -1: return multiplier * s Returns: float, list, np.ndarray: The step size of Levy-flight trajectory """ # u and v are two random variables which follow self.generator.normal distribution # sigma_u : standard deviation of u sigma_u = np.power(gamma(1. + beta) * np.sin(np.pi * beta / 2) / (gamma((1 + beta) / 2.) * beta * np.power(2., (beta - 1) / 2)), 1. / beta) # sigma_v : standard deviation of v sigma_v = 1 size = 1 if size is None else size u = self.generator.normal(0, sigma_u ** 2, size) v = self.generator.normal(0, sigma_v ** 2, size) s = u / np.power(np.abs(v), 1 / beta) if case == 0: step = multiplier * s * self.generator.uniform() elif case == 1: step = multiplier * s * self.generator.normal(0, 1) else: step = multiplier * s return step[0] if size == 1 else step
[docs] def generate_opposition_solution(self, agent: Agent = None, g_best: Agent = None) -> np.ndarray: """ Args: agent: The current agent g_best: the global best agent Returns: The opposite solution """ pos_new = self.problem.lb + self.problem.ub - g_best.solution + self.generator.uniform() * (g_best.solution - agent.solution) return self.correct_solution(pos_new)
[docs] def generate_group_population(self, pop: List[Agent], n_groups: int, m_agents: int) -> List: """ Generate a list of group population from pop Args: pop: The current population n_groups: The n groups m_agents: The m agents in each group Returns: A list of group population """ pop_group = [] for idx in range(0, n_groups): group = pop[idx * m_agents: (idx + 1) * m_agents] pop_group.append([agent.copy() for agent in group]) return pop_group
### Crossover
[docs] def crossover_arithmetic(self, dad_pos=None, mom_pos=None): """ Args: dad_pos: position of dad mom_pos: position of mom Returns: list: position of 1st and 2nd child """ r = self.generator.uniform() # w1 = w2 when r =0.5 w1 = np.multiply(r, dad_pos) + np.multiply((1 - r), mom_pos) w2 = np.multiply(r, mom_pos) + np.multiply((1 - r), dad_pos) return w1, w2
#### Improved techniques can be used in any algorithms: 1 ## Based on this paper: An efficient equilibrium optimizer with mutation strategy for numerical optimization (but still different) ## This scheme used after the original and including 4 step: ## s1: sort population, take p1 = 1/2 best population for next round ## s2: do the mutation for p1, using greedy method to select the better solution ## s3: do the search mechanism for p1 (based on global best solution and the updated p1 above), to make p2 population ## s4: construct the new population for next generation
[docs] def improved_ms(self, pop=None, g_best=None): ## m: mutation, s: search pop_len = int(len(pop) / 2) ## Sort the updated population based on fitness pop = sorted(pop, key=lambda agent: agent.target.fitness) pop_s1, pop_s2 = pop[:pop_len], pop[pop_len:] ## Mutation scheme pop_new = [] for idx in range(0, pop_len): agent = pop_s1[idx].copy() pos_new = pop_s1[idx].solution * (1 + self.generator.normal(0, 1, self.problem.n_dims)) agent.solution = self.correct_solution(pos_new) pop_new.append(agent) pop_new = self.update_target_for_population(pop_new) pop_s1 = self.greedy_selection_population(pop_s1, pop_new, self.problem.minmax) ## Greedy method --> improved exploitation ## Search Mechanism pos_s1_list = [agent.solution for agent in pop_s1] pos_s1_mean = np.mean(pos_s1_list, axis=0) pop_new = [] for idx in range(0, pop_len): agent = pop_s2[idx].copy() pos_new = (g_best.solution - pos_s1_mean) - self.generator.random() * \ (self.problem.lb + self.generator.random() * (self.problem.ub - self.problem.lb)) agent.solution = self.correct_solution(pos_new) pop_new.append(agent) ## Keep the diversity of populatoin and still improved the exploration pop_s2 = self.update_target_for_population(pop_new) pop_s2 = self.greedy_selection_population(pop_s2, pop_new, self.problem.minmax) ## Construct a new population pop = pop_s1 + pop_s2 return pop