# !/usr/bin/env python
# Created by "Thieu" at 08:58, 16/03/2020 ----------%
# Email: nguyenthieu2102@gmail.com %
# Github: https://github.com/thieu1995 %
# --------------------------------------------------%
import numpy as np
from math import gamma
from copy import deepcopy
from mealpy.utils.history import History
from mealpy.utils.problem import Problem
from mealpy.utils.termination import Termination
import concurrent.futures as parallel
import time
[docs]class Optimizer:
"""
The base class of all algorithms. All methods in this class will be inherited
Notes
~~~~~
+ The solve() is the most important method, trained the model
+ The parallel (multithreading or multiprocessing) is used in method: create_population(), update_fitness_population()
+ The general format of:
+ population = [agent_1, agent_2, ..., agent_N]
+ agent = global_best = solution = [position, target]
+ target = [fitness value, objective_list]
+ objective_list = [obj_1, obj_2, ..., obj_M]
+ Access to the:
+ position of solution/agent: solution[0] or solution[self.ID_POS] or model.solution[model.ID_POS]
+ fitness: solution[1][0] or solution[self.ID_TAR][self.ID_FIT] or model.solution[model.ID_TAR][model.ID_FIT]
+ objective values: solution[1][1] or solution[self.ID_TAR][self.ID_OBJ] or model.solution[model.ID_TAR][model.ID_OBJ]
"""
ID_POS = 0 # Index of position/location of solution/agent
ID_TAR = 1 # Index of target list, (includes fitness value and objectives list)
ID_FIT = 0 # Index of target (the final fitness) in fitness
ID_OBJ = 1 # Index of objective list in target
EPSILON = 10E-10
def __init__(self, problem, kwargs):
"""
Args:
problem: an instance of Problem class or a dictionary
Examples:
problem = {
"fit_func": your objective function,
"lb": list of value
"ub": list of value
"minmax": "min" or "max"
"verbose": True or False
"n_dims": int (Optional)
"obj_weight": list weights for all your objectives (Optional, default = [1, 1, ...1])
}
"""
super(Optimizer, self).__init__()
self.epoch, self.pop_size, self.solution = None, None, None
self.mode, self._print_model = "sequential", ""
self.pop, self.g_best = None, None
self.__set_keyword_arguments(kwargs)
self.history = History()
if (not isinstance(problem, Problem)) and type(problem) == dict:
problem = Problem(**problem)
self.problem = problem
self.amend_position = self.problem.amend_position
self.termination_flag = False # Check if exist object or not
if "termination" in kwargs:
termination = kwargs["termination"]
if isinstance(termination, Termination):
self.termination = termination
elif type(termination) == dict:
self.termination = Termination(termination=termination)
else:
print("Please create and input your Termination dictionary or Termination object.")
exit(0)
self.termination_flag = True
if "name" in kwargs: self._print_model += f"Model: {kwargs['name']}, "
if "fit_name" in kwargs: self._print_model += f"Func: {kwargs['fit_name']}, "
self.nfe_per_epoch = self.pop_size
self.sort_flag = False
def __set_keyword_arguments(self, kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
[docs] def termination_start(self):
if self.termination_flag:
if self.termination.mode == 'TB':
self.count_terminate = time.time()
elif self.termination.mode == 'ES':
self.count_terminate = 0
elif self.termination.mode == 'MG':
self.count_terminate = self.epoch
else: # number of function evaluation (NFE)
self.count_terminate = 0 # self.pop_size # First out of loop
else:
pass
[docs] def initialization(self):
self.pop = self.create_population(self.pop_size)
if self.sort_flag:
self.pop, self.g_best = self.get_global_best_solution(self.pop) # We sort the population
else:
_, self.g_best = self.get_global_best_solution(self.pop) # We don't sort the population
[docs] def before_evolve(self, epoch):
pass
[docs] def after_evolve(self, epoch):
pass
[docs] def solve(self, mode='sequential'):
"""
Args:
mode (str): 'sequential', 'thread', 'process'.
* 'sequential': recommended for simple and small task (< 10 seconds for calculating objective)
* 'thread': recommended for IO bound task, or small computing task (< 2 minutes for calculating objective)
* 'process': recommended for hard and big task (> 2 minutes for calculating objective)
Returns:
list: [position, fitness value]
"""
self.mode = mode
self.termination_start()
self.initialization()
self.history.save_initial_best(self.g_best)
for epoch in range(0, self.epoch):
time_epoch = time.time()
## Call before evolve function
self.before_evolve(epoch)
## Evolve method will be called in child class
self.evolve(epoch)
## Call after evolve function
self.after_evolve(epoch)
# update global best position
if self.sort_flag:
self.pop, self.g_best = self.update_global_best_solution(self.pop) # We sort the population
else:
_, self.g_best = self.update_global_best_solution(self.pop) # We don't sort the population
## Additional information for the framework
time_epoch = time.time() - time_epoch
self.history.list_epoch_time.append(time_epoch)
self.history.list_population.append(deepcopy(self.pop))
self.print_epoch(epoch + 1, time_epoch)
if self.termination_flag:
if self.termination.mode == 'TB':
if time.time() - self.count_terminate >= self.termination.quantity:
self.termination.logging(self.problem.verbose)
break
elif self.termination.mode == 'FE':
self.count_terminate += self.nfe_per_epoch
if self.count_terminate >= self.termination.quantity:
self.termination.logging(self.problem.verbose)
break
elif self.termination.mode == 'MG':
if (epoch+1) >= self.termination.quantity:
self.termination.logging(self.problem.verbose)
break
else: # Early Stopping
temp = self.count_terminate + self.history.get_global_repeated_times(self.ID_TAR, self.ID_FIT, self.EPSILON)
if temp >= self.termination.quantity:
self.termination.logging(self.problem.verbose)
break
## Additional information for the framework
self.save_optimization_process()
return self.solution[self.ID_POS], self.solution[self.ID_TAR][self.ID_FIT]
[docs] def evolve(self, epoch):
pass
[docs] def create_solution(self):
"""
To get the position, target wrapper [fitness and obj list]
+ A[self.ID_POS] --> Return: position
+ A[self.ID_TAR] --> Return: [fitness, [obj1, obj2, ...]]
+ A[self.ID_TAR][self.ID_FIT] --> Return: fitness
+ A[self.ID_TAR][self.ID_OBJ] --> Return: [obj1, obj2, ...]
Returns:
list: wrapper of solution with format [position, [fitness, [obj1, obj2, ...]]]
"""
position = np.random.uniform(self.problem.lb, self.problem.ub)
position = self.amend_position(position)
fitness = self.get_fitness_position(position=position)
return [position, fitness]
[docs] def create_population(self, pop_size=None):
"""
Args:
pop_size (int): number of solutions
Returns:
list: population or list of solutions/agents
"""
if pop_size is None:
pop_size = self.pop_size
pop = []
if self.mode == "thread":
with parallel.ThreadPoolExecutor() as executor:
list_executors = [executor.submit(self.create_solution) for _ in range(pop_size)]
# This method yield the result everytime a thread finished their job (not by order)
for f in parallel.as_completed(list_executors):
pop.append(f.result())
elif self.mode == "process":
with parallel.ProcessPoolExecutor() as executor:
list_executors = [executor.submit(self.create_solution) for _ in range(pop_size)]
# This method yield the result everytime a cpu finished their job (not by order).
for f in parallel.as_completed(list_executors):
pop.append(f.result())
else:
pop = [self.create_solution() for _ in range(0, pop_size)]
return pop
[docs] def update_fitness_population(self, pop=None):
"""
Args:
pop (list): the population
Returns:
list: population with updated fitness value
"""
if self.mode == "thread":
with parallel.ThreadPoolExecutor() as executor:
list_results = executor.map(self.get_fitness_solution, pop) # Return result not the future object
for idx, fit in enumerate(list_results):
pop[idx][self.ID_TAR] = fit
elif self.mode == "process":
with parallel.ProcessPoolExecutor() as executor:
list_results = executor.map(self.get_fitness_solution, pop) # Return result not the future object
for idx, fit in enumerate(list_results):
pop[idx][self.ID_TAR] = fit
else:
for idx, agent in enumerate(pop):
pop[idx][self.ID_TAR] = self.get_fitness_solution(agent)
return pop
[docs] def get_fitness_position(self, position=None):
"""
Args:
position (nd.array): 1-D numpy array
Returns:
[target, [obj1, obj2, ...]]
"""
objs = self.problem.fit_func(position)
if not self.problem.obj_is_list:
objs = [objs]
fit = np.dot(objs, self.problem.obj_weight)
return [fit, objs]
[docs] def get_fitness_solution(self, solution=None):
"""
Args:
solution (list): A solution with format [position, [target, [obj1, obj2, ...]]]
Returns:
[target, [obj1, obj2, ...]]
"""
return self.get_fitness_position(solution[self.ID_POS])
[docs] def get_global_best_solution(self, pop: list):
"""
Sort population and return the sorted population and the best solution
Args:
pop (list): The population of pop_size individuals
Returns:
Sorted population and global best solution
"""
sorted_pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT]) # Already returned a new sorted list
if self.problem.minmax == "min":
return sorted_pop, deepcopy(sorted_pop[0])
else:
return sorted_pop, deepcopy(sorted_pop[-1])
[docs] def get_better_solution(self, agent1: list, agent2: list):
"""
Args:
agent1 (list): A solution
agent2 (list): Another solution
Returns:
The better solution between them
"""
if self.problem.minmax == "min":
if agent1[self.ID_TAR][self.ID_FIT] < agent2[self.ID_TAR][self.ID_FIT]:
return deepcopy(agent1)
return deepcopy(agent2)
else:
if agent1[self.ID_TAR][self.ID_FIT] < agent2[self.ID_TAR][self.ID_FIT]:
return deepcopy(agent2)
return deepcopy(agent1)
[docs] def compare_agent(self, agent_a: list, agent_b: list):
"""
Args:
agent_a (list): Solution a
agent_b (list): Solution b
Returns:
boolean: Return True if solution a better than solution b and otherwise
"""
if self.problem.minmax == "min":
if agent_a[self.ID_TAR][self.ID_FIT] < agent_b[self.ID_TAR][self.ID_FIT]:
return True
return False
else:
if agent_a[self.ID_TAR][self.ID_FIT] < agent_b[self.ID_TAR][self.ID_FIT]:
return False
return True
[docs] def get_special_solutions(self, pop=None, best=3, worst=3):
"""
Args:
pop (list): The population
best (int): Top k1 best solutions, default k1=3, it can be None
worst (int): Top k2 worst solutions, default k2=3, it can be None
Returns:
list: sorted_population, k1 best solutions and k2 worst solutions
"""
if self.problem.minmax == "min":
pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT])
else:
pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT], reverse=True)
if best is None:
if worst is None:
exit(0)
else:
return pop, None, deepcopy(pop[-worst:])
else:
if worst is None:
return pop, deepcopy(pop[:best]), None
else:
return pop, deepcopy(pop[:best]), deepcopy(pop[-worst:])
[docs] def get_special_fitness(self, pop=None):
"""
Args:
pop (list): The population
Returns:
list: Total fitness, best fitness, worst fitness
"""
total_fitness = np.sum([agent[self.ID_TAR][self.ID_FIT] for agent in pop])
if self.problem.minmax == "min":
pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT])
else:
pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT], reverse=True)
return total_fitness, pop[0][self.ID_TAR][self.ID_FIT], pop[-1][self.ID_TAR][self.ID_FIT]
[docs] def update_global_best_solution(self, pop=None, save=True):
"""
Update the global best solution saved in variable named: self.history_list_g_best
Args:
pop (list): The population of pop_size individuals
save (bool): True if you want to add new current global best and False if you just want update the current one.
Returns:
list: Sorted population and the global best solution
"""
if self.problem.minmax == "min":
sorted_pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT])
else:
sorted_pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT], reverse=True)
current_best = sorted_pop[0]
if save:
self.history.list_current_best.append(current_best)
better = self.get_better_solution(current_best, self.history.list_global_best[-1])
self.history.list_global_best.append(better)
return deepcopy(sorted_pop), deepcopy(better)
else:
local_better = self.get_better_solution(current_best, self.history.list_current_best[-1])
self.history.list_current_best[-1] = local_better
global_better = self.get_better_solution(current_best, self.history.list_global_best[-1])
self.history.list_global_best[-1] = global_better
return deepcopy(sorted_pop), deepcopy(global_better)
[docs] def print_epoch(self, epoch, runtime):
"""
Print out the detailed information of training process
Args:
epoch (int): current iteration
runtime (float): the runtime for current iteration
"""
if self.problem.verbose:
print(f"> {self._print_model}Epoch: {epoch}, Current best: {self.history.list_current_best[-1][self.ID_TAR][self.ID_FIT]}, "
f"Global best: {self.history.list_global_best[-1][self.ID_TAR][self.ID_FIT]}, Runtime: {runtime:.5f} seconds")
[docs] def save_optimization_process(self):
"""
Save important data for later use such as:
+ list_global_best_fit
+ list_current_best_fit
+ list_diversity
+ list_exploitation
+ list_exploration
"""
self.history.epoch = len(self.history.list_global_best)
self.history.list_global_best_fit = [agent[self.ID_TAR][self.ID_FIT] for agent in self.history.list_global_best]
self.history.list_current_best_fit = [agent[self.ID_TAR][self.ID_FIT] for agent in self.history.list_current_best]
# Draw the exploration and exploitation line with this data
self.history.list_diversity = np.ones(self.history.epoch)
for idx, pop in enumerate(self.history.list_population):
pos_matrix = np.array([agent[self.ID_POS] for agent in pop])
div = np.mean(abs((np.median(pos_matrix, axis=0) - pos_matrix)), axis=0)
self.history.list_diversity[idx] = np.mean(div, axis=0)
div_max = np.max(self.history.list_diversity)
self.history.list_exploration = 100 * (self.history.list_diversity / div_max)
self.history.list_exploitation = 100 - self.history.list_exploration
self.solution = self.history.list_global_best[-1]
## Selection techniques
[docs] def get_index_roulette_wheel_selection(self, list_fitness: np.array):
"""
This method can handle min/max problem, and negative or positive fitness value.
Args:
list_fitness (nd.array): 1-D numpy array
Returns:
int: Index of selected solution
"""
scaled_fitness = (list_fitness - np.min(list_fitness)) / (np.ptp(list_fitness) + self.EPSILON)
if self.problem.minmax == "min":
final_fitness = 1.0 - scaled_fitness
else:
final_fitness = scaled_fitness
total_sum = sum(final_fitness)
r = np.random.uniform(low=0, high=total_sum)
for idx, f in enumerate(final_fitness):
r = r + f
if r > total_sum:
return idx
return np.random.choice(range(0, len(list_fitness)))
[docs] def get_index_kway_tournament_selection(self, pop=None, k_way=0.2, output=2, reverse=False):
"""
Args:
pop: The population
k_way (float/int): The percent or number of solutions are randomized pick
output (int): The number of outputs
reverse (bool): set True when finding the worst fitness
Returns:
list: List of the selected indexes
"""
if 0 < k_way < 1:
k_way = int(k_way * len(pop))
list_id = np.random.choice(range(len(pop)), k_way, replace=False)
list_parents = [[idx, pop[idx][self.ID_TAR][self.ID_FIT]] for idx in list_id]
if self.problem.minmax == "min":
list_parents = sorted(list_parents, key=lambda agent: agent[1])
else:
list_parents = sorted(list_parents, key=lambda agent: agent[1], reverse=True)
if reverse:
return [parent[0] for parent in list_parents[-output:]]
return [parent[0] for parent in list_parents[:output]]
[docs] def get_levy_flight_step(self, beta=1.0, multiplier=0.001, case=0):
"""
Get the Levy-flight step size
Args:
beta (float): Should be in range [0, 2].
* 0-1: small range --> exploit
* 1-2: large range --> explore
multiplier (float): default = 0.001
case (int): Should be one of these value [0, 1, -1].
* 0: return multiplier * s * np.random.uniform()
* 1: return multiplier * s * np.random.normal(0, 1)
* -1: return multiplier * s
Returns:
int: The step size of Levy-flight trajectory
"""
# u and v are two random variables which follow np.random.normal distribution
# sigma_u : standard deviation of u
sigma_u = np.power(gamma(1 + beta) * np.sin(np.pi * beta / 2) / (gamma((1 + beta) / 2) * beta * np.power(2, (beta - 1) / 2)), 1 / beta)
# sigma_v : standard deviation of v
sigma_v = 1
u = np.random.normal(0, sigma_u ** 2)
v = np.random.normal(0, sigma_v ** 2)
s = u / np.power(abs(v), 1 / beta)
if case == 0:
step = multiplier * s * np.random.uniform()
elif case == 1:
step = multiplier * s * np.random.normal(0, 1)
else:
step = multiplier * s
return step
[docs] def levy_flight(self, epoch=None, position=None, g_best_position=None, step=0.001, case=0):
"""
Get the Levy-flight position of current agent
Args:
epoch (int): The current epoch/iteration
position: The position of current agent
g_best_position: The position of the global best solution
step (float): The step size in Levy-flight, default = 0.001
case (int): Should be one of these value [0, 1, 2]
Returns:
The Levy-flight position of current agent
"""
beta = 1
# muy and v are two random variables which follow np.random.normal distribution
# sigma_muy : standard deviation of muy
sigma_muy = np.power(gamma(1 + beta) * np.sin(np.pi * beta / 2) / (gamma((1 + beta) / 2) * beta * np.power(2, (beta - 1) / 2)), 1 / beta)
# sigma_v : standard deviation of v
sigma_v = 1
muy = np.random.normal(0, sigma_muy ** 2)
v = np.random.normal(0, sigma_v ** 2)
s = muy / np.power(abs(v), 1 / beta)
levy = np.random.uniform(self.problem.lb, self.problem.ub) * step * s * (position - g_best_position)
if case == 0:
return levy
elif case == 1:
return position + 1.0 / np.sqrt(epoch + 1) * np.sign(np.random.random() - 0.5) * levy
elif case == 2:
return position + np.random.normal(0, 1, len(self.problem.lb)) * levy
elif case == 3:
return position + 0.01 * levy
[docs] def get_global_best_global_worst_solution(self, pop=None):
"""
Args:
pop (list): The population
Returns:
list: The global best and the global worst solution
"""
# Already returned a new sorted list
sorted_pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT])
if self.problem.minmax == "min":
return deepcopy(sorted_pop[0]), deepcopy(sorted_pop[-1])
else:
return deepcopy(sorted_pop[-1]), deepcopy(sorted_pop[0])
### Survivor Selection
[docs] def greedy_selection_population(self, pop_old=None, pop_new=None):
"""
Args:
pop_old (list): The current population
pop_new (list): The next population
Returns:
The new population with better solutions
"""
len_old, len_new = len(pop_old), len(pop_new)
if len_old != len_new:
print("Pop old and Pop new should be the same length!")
exit(0)
if self.problem.minmax == "min":
return [pop_new[i] if pop_new[i][self.ID_TAR][self.ID_FIT] < pop_old[i][self.ID_TAR][self.ID_FIT]
else pop_old[i] for i in range(len_old)]
else:
return [pop_new[i] if pop_new[i][self.ID_TAR] > pop_old[i][self.ID_TAR]
else pop_old[i] for i in range(len_old)]
[docs] def get_sorted_strim_population(self, pop=None, pop_size=None, reverse=False):
"""
Args:
pop (list): The population
pop_size (int): The number of population
Returns:
The sorted population with pop_size size
"""
if self.problem.minmax == "min":
pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT], reverse=reverse)
else:
pop = sorted(pop, key=lambda agent: agent[self.ID_TAR][self.ID_FIT], reverse=reverse)
return pop[:pop_size]
[docs] def create_opposition_position(self, agent=None, g_best=None):
"""
Args:
agent: The current solution (agent)
g_best: the global best solution
Returns:
The opposite solution
"""
return self.problem.lb + self.problem.ub - g_best[self.ID_POS] + np.random.uniform() * (g_best[self.ID_POS] - agent[self.ID_POS])
### Crossover
[docs] def crossover_arthmetic_recombination(self, dad_pos=None, mom_pos=None):
r = np.random.uniform() # w1 = w2 when r =0.5
w1 = np.multiply(r, dad_pos) + np.multiply((1 - r), mom_pos)
w2 = np.multiply(r, mom_pos) + np.multiply((1 - r), dad_pos)
return w1, w2
#### Improved techniques can be used in any algorithms: 1
## Based on this paper: An efficient equilibrium optimizer with mutation strategy for numerical optimization (but still different)
## This scheme used after the original and including 4 step:
## s1: sort population, take p1 = 1/2 best population for next round
## s2: do the mutation for p1, using greedy method to select the better solution
## s3: do the search mechanism for p1 (based on global best solution and the updated p1 above), to make p2 population
## s4: construct the new population for next generation
[docs] def improved_ms(self, pop=None, g_best=None): ## m: mutation, s: search
pop_len = int(len(pop) / 2)
## Sort the updated population based on fitness
pop = sorted(pop, key=lambda item: item[self.ID_TAR])
pop_s1, pop_s2 = pop[:pop_len], pop[pop_len:]
## Mutation scheme
pop_new = []
for i in range(0, pop_len):
agent = deepcopy(pop_s1[i])
pos_new = pop_s1[i][self.ID_POS] * (1 + np.random.normal(0, 1, self.problem.n_dims))
agent[self.ID_POS] = self.amend_position(pos_new)
pop_new.append(agent)
pop_new = self.update_fitness_population(pop_new)
pop_s1 = self.greedy_selection_population(pop_s1, pop_new) ## Greedy method --> improved exploitation
## Search Mechanism
pos_s1_list = [item[self.ID_POS] for item in pop_s1]
pos_s1_mean = np.mean(pos_s1_list, axis=0)
pop_new = []
for i in range(0, pop_len):
agent = deepcopy(pop_s2[i])
pos_new = (g_best[self.ID_POS] - pos_s1_mean) - np.random.random() * \
(self.problem.lb + np.random.random() * (self.problem.ub - self.problem.lb))
agent[self.ID_POS] = self.amend_position(pos_new)
pop_new.append(agent)
## Keep the diversity of populatoin and still improved the exploration
pop_s2 = self.update_fitness_population(pop_new)
pop_s2 = self.greedy_selection_population(pop_s2, pop_new)
## Construct a new population
pop = pop_s1 + pop_s2
return pop