2022-11-09 12:46:28 +01:00
|
|
|
import pygad
|
|
|
|
import numpy as np
|
2022-11-11 20:50:09 +01:00
|
|
|
import random
|
2022-11-12 19:11:03 +01:00
|
|
|
import time
|
2022-11-09 12:46:28 +01:00
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
# Create a maze class
|
|
|
|
global maze_ix
|
2022-11-09 12:46:28 +01:00
|
|
|
|
|
|
|
def fitness_func(path, solution_idx):
|
2022-11-09 22:27:52 +01:00
|
|
|
maze = mazes[maze_ix]
|
|
|
|
fitness = np.sum(path * maze.punish_matrix.reshape(-1))
|
2022-11-09 12:46:28 +01:00
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
path = path.reshape(maze.punish_matrix.shape)
|
2022-11-11 20:50:09 +01:00
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
if path[maze.start_pos] == 0:
|
|
|
|
fitness -= 10000
|
|
|
|
if path[maze.end_pos] == 0:
|
|
|
|
fitness -= 10000
|
|
|
|
if path[maze.start_pos] == 1 and path[maze.end_pos] == 1:
|
2022-11-11 20:50:09 +01:00
|
|
|
fitness += 300
|
2022-11-09 22:27:52 +01:00
|
|
|
# Check if there is a valid path
|
2022-11-11 20:50:09 +01:00
|
|
|
complete_path = maze.walk_through_maze(path, critical_situation=False)
|
2022-11-09 22:27:52 +01:00
|
|
|
complete_path_len = len(complete_path)
|
2022-11-11 11:39:33 +01:00
|
|
|
|
|
|
|
# Set the first path found as the shotest one
|
|
|
|
if maze.shortest_path == [] and complete_path_len > 0:
|
|
|
|
maze.adjust_weights(complete_path)
|
|
|
|
print('First path found')
|
|
|
|
maze.shortest_path = complete_path
|
2022-11-11 20:50:09 +01:00
|
|
|
#Check if the current path is shorter than the shortest one
|
|
|
|
if complete_path_len != 0 and complete_path_len < len(maze.shortest_path):
|
2022-11-11 11:39:33 +01:00
|
|
|
print('Found a better path')
|
2022-11-09 22:27:52 +01:00
|
|
|
maze.shortest_path = complete_path
|
|
|
|
maze.adjust_weights(complete_path)
|
2022-11-11 11:39:33 +01:00
|
|
|
maze.ga_iteration += 1
|
2022-11-09 22:27:52 +01:00
|
|
|
return fitness
|
|
|
|
|
2022-11-11 20:50:09 +01:00
|
|
|
def on_mutation(generations, ga_instance):
|
|
|
|
maze = mazes[maze_ix]
|
|
|
|
|
|
|
|
# Firtly find the instances where there are no walls
|
|
|
|
no_wall_instances = np.where(maze.mutation_matrix.reshape(-1) == 1)[0]
|
|
|
|
wall_instances = np.where(maze.mutation_matrix.reshape(-1) == 0)[0]
|
|
|
|
|
|
|
|
# Loop through the population
|
|
|
|
for i in range(len(generations)):
|
2022-11-12 14:32:16 +01:00
|
|
|
# select random number of the instances where there are walls
|
|
|
|
random_false_instances = np.random.choice(wall_instances, size=int(len(no_wall_instances)* random.uniform(0.01, 1.0)), replace=False)
|
2022-11-11 20:50:09 +01:00
|
|
|
# Then randomly select random number of the instances where there are no walls
|
2022-11-12 14:32:16 +01:00
|
|
|
random_true_instances = np.random.choice(no_wall_instances, size=int(len(no_wall_instances)* random.uniform(0.01, 1.0)), replace=False)
|
2022-11-11 20:50:09 +01:00
|
|
|
# Then apply those values to generation
|
|
|
|
generations[i][random_true_instances] = 1
|
|
|
|
generations[i][random_false_instances] = 0
|
2022-11-12 14:32:16 +01:00
|
|
|
|
2022-11-11 20:50:09 +01:00
|
|
|
return generations
|
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
class Maze:
|
2022-11-12 19:11:03 +01:00
|
|
|
def __init__(self, maze, start_pos, end_pos, punish_matrix, mutation_matrix, logfile, shortest_path):
|
2022-11-09 22:27:52 +01:00
|
|
|
self.maze = maze
|
|
|
|
self.start_pos = start_pos
|
|
|
|
self.end_pos = end_pos
|
|
|
|
self.punish_matrix = punish_matrix
|
2022-11-11 20:50:09 +01:00
|
|
|
self.mutation_matrix = mutation_matrix
|
2022-11-09 22:27:52 +01:00
|
|
|
self.shortest_path = shortest_path
|
2022-11-11 11:39:33 +01:00
|
|
|
self.ga_iteration = 0
|
2022-11-12 14:32:16 +01:00
|
|
|
self.initial_population_size = 400
|
2022-11-12 19:11:03 +01:00
|
|
|
self.start_time = 0
|
|
|
|
self.end_time = 0
|
|
|
|
self.logfile = logfile
|
2022-11-09 22:27:52 +01:00
|
|
|
|
|
|
|
def run_genetic_algorithm(self):
|
|
|
|
# Set global punish matrix
|
|
|
|
punish_matrix = self.punish_matrix
|
|
|
|
maze = self.maze
|
2022-11-12 19:11:03 +01:00
|
|
|
|
|
|
|
self.start_time = time.time()
|
2022-11-09 22:27:52 +01:00
|
|
|
ga_instance = pygad.GA(num_genes=punish_matrix.size,
|
2022-11-12 14:32:16 +01:00
|
|
|
num_generations=3,
|
|
|
|
sol_per_pop=self.initial_population_size,
|
2022-11-11 20:50:09 +01:00
|
|
|
num_parents_mating=200,
|
|
|
|
gene_type=np.uint8,
|
2022-11-11 11:39:33 +01:00
|
|
|
fitness_func=fitness_func,
|
2022-11-11 20:50:09 +01:00
|
|
|
parent_selection_type="random",
|
|
|
|
keep_parents=2,
|
|
|
|
allow_duplicate_genes=True,
|
|
|
|
parallel_processing=1,
|
|
|
|
mutation_type=on_mutation,
|
2022-11-12 19:11:03 +01:00
|
|
|
initial_population=self.generate_initial_population(),
|
2022-11-11 11:39:33 +01:00
|
|
|
gene_space=[0, 1])
|
2022-11-09 22:27:52 +01:00
|
|
|
ga_instance.run()
|
2022-11-12 19:11:03 +01:00
|
|
|
self.end_time = time.time()
|
2022-11-12 22:36:54 +01:00
|
|
|
#self.log_experiment()
|
2022-11-09 22:27:52 +01:00
|
|
|
|
|
|
|
solution, solution_fitness, solution_idx = ga_instance.best_solution()
|
|
|
|
|
2022-11-11 20:50:09 +01:00
|
|
|
|
2022-11-12 22:36:54 +01:00
|
|
|
print("The shortest path is", self.shortest_path, self.ga_iteration)
|
|
|
|
self.print_shortest_path()
|
2022-11-11 11:39:33 +01:00
|
|
|
|
|
|
|
def walk_through_maze(self, solution_matrix, critical_situation):
|
2022-11-09 22:27:52 +01:00
|
|
|
queue = [[self.start_pos]]
|
|
|
|
|
|
|
|
def add_to_queue(full_path, x, y):
|
|
|
|
if (x,y) not in full_path:
|
2022-11-11 20:50:09 +01:00
|
|
|
full_path = full_path.copy()
|
2022-11-09 22:27:52 +01:00
|
|
|
full_path.append((x, y))
|
|
|
|
queue.append(full_path)
|
|
|
|
|
|
|
|
while queue != []:
|
|
|
|
full_path = queue.pop()
|
|
|
|
x, y = full_path[-1]
|
|
|
|
if(self.maze[x][y] == 'E'):
|
2022-11-11 11:39:33 +01:00
|
|
|
return full_path
|
2022-11-09 22:27:52 +01:00
|
|
|
if x + 1 < len(self.maze) :
|
2022-11-11 11:39:33 +01:00
|
|
|
if solution_matrix[x+1, y] == 1 and (critical_situation or (self.maze[x+1][y] == "." or self.maze[x+1][y] == "E")):
|
2022-11-09 22:27:52 +01:00
|
|
|
add_to_queue(full_path, x+1, y)
|
|
|
|
if x - 1 >= 0:
|
2022-11-11 11:39:33 +01:00
|
|
|
if solution_matrix[x-1, y] == 1 and (critical_situation or (self.maze[x-1][y] == "." or self.maze[x-1][y] == "E")):
|
2022-11-09 22:27:52 +01:00
|
|
|
add_to_queue(full_path, x-1, y)
|
|
|
|
if y + 1 < len(self.maze) :
|
2022-11-11 11:39:33 +01:00
|
|
|
if solution_matrix[x, y+1] == 1 and (critical_situation or(self.maze[x][y+1] == "." or self.maze[x][y+1] == "E")):
|
2022-11-09 22:27:52 +01:00
|
|
|
add_to_queue(full_path, x, y+1)
|
|
|
|
if y - 1 >= 0:
|
2022-11-11 11:39:33 +01:00
|
|
|
if solution_matrix[x, y-1] == 1 and (critical_situation or (self.maze[x][y-1] == "." or self.maze[x][y-1] == "E")):
|
2022-11-09 22:27:52 +01:00
|
|
|
add_to_queue(full_path, x, y-1)
|
2022-11-11 20:50:09 +01:00
|
|
|
|
2022-11-11 11:39:33 +01:00
|
|
|
return []
|
2022-11-09 22:27:52 +01:00
|
|
|
|
|
|
|
def adjust_weights(self, found_path):
|
|
|
|
for (x, y) in found_path:
|
2022-11-11 20:50:09 +01:00
|
|
|
self.punish_matrix[x,y] += 2000
|
2022-11-11 11:39:33 +01:00
|
|
|
|
|
|
|
def print_maze(self):
|
|
|
|
for row in self.maze:
|
|
|
|
print(' '.join(row))
|
|
|
|
|
|
|
|
def print_shortest_path(self):
|
|
|
|
for (x, y) in self.shortest_path:
|
|
|
|
lst = list(self.maze[x])
|
|
|
|
lst[y] = 'X'
|
|
|
|
self.maze[x] = ''.join(lst)
|
|
|
|
self.print_maze()
|
2022-11-12 14:32:16 +01:00
|
|
|
|
|
|
|
def generate_initial_population(self):
|
|
|
|
# Generate initial population
|
|
|
|
# Firtly find the instances where there are no walls
|
|
|
|
no_wall_instances = np.where(self.mutation_matrix.reshape(-1) == 1)[0]
|
|
|
|
wall_instances = np.where(self.mutation_matrix.reshape(-1) == 0)[0]
|
|
|
|
|
2022-11-12 19:11:03 +01:00
|
|
|
initial_population = np.random.choice([0, 1], size=(self.initial_population_size, self.punish_matrix.size))
|
2022-11-12 14:32:16 +01:00
|
|
|
|
|
|
|
for population in initial_population:
|
|
|
|
# select random number of the instances where there are walls
|
|
|
|
random_false_instances = np.random.choice(wall_instances, size=int(len(no_wall_instances)* random.uniform(0.5, 1.0)), replace=False)
|
|
|
|
# Then randomly select random number of the instances where there are no walls
|
|
|
|
random_true_instances = np.random.choice(no_wall_instances, size=int(len(no_wall_instances)* random.uniform(0.5, 1.0)), replace=False)
|
|
|
|
# Then apply those values to generation
|
|
|
|
population[random_true_instances] = 1
|
|
|
|
population[random_false_instances] = 0
|
|
|
|
|
|
|
|
return initial_population
|
2022-11-12 19:11:03 +01:00
|
|
|
|
|
|
|
def log_experiment(self):
|
|
|
|
with open(self.logfile, 'a') as f:
|
|
|
|
f.write(str(maze_ix) +',')
|
|
|
|
f.write(str(self.end_time - self.start_time) +',')
|
|
|
|
f.write(str(self.shortest_path))
|
|
|
|
f.write('\n')
|
|
|
|
f.flush()
|
|
|
|
f.close()
|
2022-11-09 22:27:52 +01:00
|
|
|
|
2022-11-09 12:46:28 +01:00
|
|
|
def read_mazes():
|
2022-11-12 22:36:54 +01:00
|
|
|
with open('./mazes_classic.txt', 'r') as f:
|
2022-11-09 12:46:28 +01:00
|
|
|
mazes = []
|
|
|
|
maze = []
|
|
|
|
for line in f:
|
|
|
|
if line == '\n':
|
|
|
|
mazes.append(maze)
|
|
|
|
maze = []
|
|
|
|
continue
|
|
|
|
maze.append(line.strip())
|
|
|
|
return mazes
|
|
|
|
|
2022-11-12 19:11:03 +01:00
|
|
|
def prepare_maze(maze_ix, mazes, LOGFILE):
|
2022-11-09 12:46:28 +01:00
|
|
|
maze = mazes[maze_ix]
|
2022-11-09 22:27:52 +01:00
|
|
|
punish_matrix = np.zeros((len(maze), len(maze)), dtype=np.int64)
|
2022-11-11 20:50:09 +01:00
|
|
|
mutation_matrix = np.zeros((len(maze), len(maze)), dtype=np.uint8)
|
2022-11-09 12:46:28 +01:00
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
start_index = 0, 0
|
|
|
|
end_index = 0, 0
|
|
|
|
treasures = []
|
|
|
|
|
|
|
|
# Initialize punish matrix and find start and end index
|
2022-11-09 12:46:28 +01:00
|
|
|
for i, x in enumerate(maze):
|
|
|
|
for j, y in enumerate(x):
|
|
|
|
if y == "#":
|
2022-11-11 14:35:52 +01:00
|
|
|
punish_matrix[i, j] = -1000
|
2022-11-11 20:50:09 +01:00
|
|
|
mutation_matrix[i, j] = 0
|
2022-11-09 12:46:28 +01:00
|
|
|
if y == ".":
|
2022-11-11 20:50:09 +01:00
|
|
|
punish_matrix[i, j] = +1000
|
|
|
|
mutation_matrix[i, j] = 1
|
2022-11-09 12:46:28 +01:00
|
|
|
if y == "S":
|
2022-11-09 22:27:52 +01:00
|
|
|
start_index = i, j
|
2022-11-11 20:50:09 +01:00
|
|
|
mutation_matrix[i, j] = 1
|
2022-11-09 12:46:28 +01:00
|
|
|
if y == "E":
|
2022-11-09 22:27:52 +01:00
|
|
|
end_index = i, j
|
2022-11-11 20:50:09 +01:00
|
|
|
mutation_matrix[i, j] = 1
|
2022-11-09 22:27:52 +01:00
|
|
|
if y == "T":
|
2022-11-12 14:32:16 +01:00
|
|
|
mutation_matrix[i, j] = 2
|
2022-11-09 22:27:52 +01:00
|
|
|
treasures.append((i, j))
|
2022-11-09 12:46:28 +01:00
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
# Create maze class
|
2022-11-12 19:11:03 +01:00
|
|
|
maze = Maze(maze, start_index, end_index, punish_matrix, mutation_matrix, LOGFILE, [])
|
2022-11-09 12:46:28 +01:00
|
|
|
|
2022-11-09 22:27:52 +01:00
|
|
|
return maze
|
2022-11-12 19:11:03 +01:00
|
|
|
|
|
|
|
def prepare_log(LOGFILE):
|
|
|
|
with open(LOGFILE, 'a') as f:
|
|
|
|
f.write('Time,Maze,Shortest path')
|
|
|
|
f.write('\n')
|
|
|
|
f.flush()
|
|
|
|
f.close()
|
2022-11-09 22:27:52 +01:00
|
|
|
|
2022-11-09 12:46:28 +01:00
|
|
|
def main():
|
|
|
|
# Read mazes
|
2022-11-09 22:27:52 +01:00
|
|
|
global maze_ix, mazes
|
|
|
|
mazes = []
|
|
|
|
text_mazes = read_mazes()
|
2022-11-12 21:52:45 +01:00
|
|
|
LOGFILE = 'log_t2_harder.txt'
|
2022-11-12 22:36:54 +01:00
|
|
|
#prepare_log(LOGFILE)
|
2022-11-11 11:39:33 +01:00
|
|
|
for i in range(len(text_mazes)):
|
|
|
|
print('MAZE: ', i)
|
2022-11-09 22:27:52 +01:00
|
|
|
maze_ix = i
|
2022-11-12 19:11:03 +01:00
|
|
|
maze = prepare_maze(i, text_mazes, LOGFILE)
|
2022-11-09 22:27:52 +01:00
|
|
|
mazes.append(maze)
|
|
|
|
maze.run_genetic_algorithm()
|
2022-11-09 12:46:28 +01:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2022-11-09 22:27:52 +01:00
|
|
|
main()
|