Source code for mealpy.system_based.WCA

#!/usr/bin/env python
# Created by "Thieu" at 09:55, 02/03/2021 ----------%
#       Email: nguyenthieu2102@gmail.com            %
#       Github: https://github.com/thieu1995        %
# --------------------------------------------------%

import numpy as np
from mealpy.optimizer import Optimizer


[docs]class OriginalWCA(Optimizer): """ The original version of: Water Cycle Algorithm (WCA) Links: 1. https://doi.org/10.1016/j.compstruc.2012.07.010 Notes ~~~~~ The ideas are (almost the same as ICO algorithm): + 1 sea is global best solution + a few river which are second, third, ... + other left are stream (will flow directed to sea or river) Hyper-parameters should fine-tune in approximate range to get faster convergence toward the global optimum: + nsr (int): [4, 10], Number of rivers + sea (sea = 1), default = 4 + wc (float): [1.0, 3.0], Weighting coefficient (C in the paper), default = 2 + dmax (float): [1e-6], fixed parameter, Evaporation condition constant, default=1e-6 Examples ~~~~~~~~ >>> import numpy as np >>> from mealpy import FloatVar, WCA >>> >>> def objective_function(solution): >>> return np.sum(solution**2) >>> >>> problem_dict = { >>> "bounds": FloatVar(n_vars=30, lb=(-10.,) * 30, ub=(10.,) * 30, name="delta"), >>> "minmax": "min", >>> "obj_func": objective_function >>> } >>> >>> model = WCA.OriginalWCA(epoch=1000, pop_size=50, nsr = 4, wc = 2.0, dmax = 1e-6) >>> g_best = model.solve(problem_dict) >>> print(f"Solution: {g_best.solution}, Fitness: {g_best.target.fitness}") >>> print(f"Solution: {model.g_best.solution}, Fitness: {model.g_best.target.fitness}") References ~~~~~~~~~~ [1] Eskandar, H., Sadollah, A., Bahreininejad, A. and Hamdi, M., 2012. Water cycle algorithm–A novel metaheuristic optimization method for solving constrained engineering optimization problems. Computers & Structures, 110, pp.151-166. """ def __init__(self, epoch: int = 10000, pop_size: int = 100, nsr: int = 4, wc: float = 2.0, dmax: float = 1e-6, **kwargs: object) -> None: """ Args: epoch (int): maximum number of iterations, default = 10000 pop_size (int): number of population size, default = 100 nsr (int): Number of rivers + sea (sea = 1), default = 4 wc (float): Weighting coefficient (C in the paper), default = 2.0 dmax (float): Evaporation condition constant, default=1e-6 """ super().__init__(**kwargs) self.epoch = self.validator.check_int("epoch", epoch, [1, 100000]) self.pop_size = self.validator.check_int("pop_size", pop_size, [5, 10000]) self.nsr = self.validator.check_int("nsr", nsr, [2, int(self.pop_size/2)]) self.wc = self.validator.check_float("wc", wc, (1.0, 3.0)) self.dmax = self.validator.check_float("dmax", dmax, (0, 1.0)) self.set_parameters(["epoch", "pop_size", "nsr", "wc", "dmax"]) self.sort_flag = True
[docs] def initialization(self): if self.pop is None: self.pop = self.generate_population(self.pop_size) self.pop = self.get_sorted_population(self.pop, self.problem.minmax) self.g_best = self.pop[0] self.ecc = self.dmax # Evaporation condition constant - variable n_stream = self.pop_size - self.nsr g_best = self.pop[0].copy() # Global best solution (sea) self.pop_best = self.pop[:self.nsr] # Including sea and river (1st solution is sea) self.pop_stream = self.pop[self.nsr:] # Forming Stream # Designate streams to rivers and sea cost_river_list = np.array([agent.target.fitness for agent in self.pop_best]) num_child_in_river_list = np.round(np.abs(cost_river_list / np.sum(cost_river_list)) * n_stream).astype(int) if np.sum(num_child_in_river_list) < n_stream: num_child_in_river_list[-1] += n_stream - np.sum(num_child_in_river_list) streams = {} idx_already_selected = [] for i in range(0, self.nsr - 1): streams[i] = [] idx_list = np.random.choice(list(set(range(0, n_stream)) - set(idx_already_selected)), num_child_in_river_list[i], replace=False).tolist() idx_already_selected += idx_list for idx in idx_list: streams[i].append(self.pop_stream[idx]) idx_last = list(set(range(0, n_stream)) - set(idx_already_selected)) streams[self.nsr - 1] = [] for idx in idx_last: streams[self.nsr - 1].append(self.pop_stream[idx]) self.streams = streams
[docs] def evolve(self, epoch): """ The main operations (equations) of algorithm. Inherit from Optimizer class Args: epoch (int): The current iteration """ # Update stream and river for idx, stream_list in self.streams.items(): # Update stream stream_new = [] for idx_stream, stream in enumerate(stream_list): pos_new = stream.solution + self.generator.uniform() * self.wc * (self.pop_best[idx].solution - stream.solution) pos_new = self.correct_solution(pos_new) agent = self.generate_empty_agent(pos_new) stream_new.append(agent) if self.mode not in self.AVAILABLE_MODES: stream_new[-1].target = self.get_target(pos_new) stream_new = self.update_target_for_population(stream_new) self.streams[idx] = stream_new stream_best = self.get_best_agent(stream_new, self.problem.minmax) if self.compare_target(stream_best.target, self.pop_best[idx].target, self.problem.minmax): self.pop_best[idx] = stream_best.copy() # Update river pos_new = self.pop_best[idx].solution + self.generator.uniform() * self.wc * (self.g_best.solution - self.pop_best[idx].solution) pos_new = self.correct_solution(pos_new) agent = self.generate_agent(pos_new) if self.compare_target(agent.target, self.pop_best[idx].target, self.problem.minmax): self.pop_best[idx] = agent # Evaporation for idx in range(1, self.nsr): distance = np.sqrt(np.sum((self.g_best.solution - self.pop_best[idx].solution) ** 2)) if distance < self.ecc or self.generator.random() < 0.1: child = self.generate_agent() pop_current_best = self.get_sorted_population(self.streams[idx] + [child], self.problem.minmax) self.pop_best[idx] = pop_current_best.pop(0) self.streams[idx] = pop_current_best self.pop = self.pop_best.copy() for idx, stream_list in self.streams.items(): self.pop += stream_list # Reduce the ecc self.ecc = self.ecc - self.ecc / self.epoch