import pygad import numpy as np import time # Create a maze class global maze_ix def fitness_func(path, solution_idx): maze = mazes[maze_ix] fitness = np.sum(path * maze.punish_matrix.reshape(-1)) path = path.reshape(maze.punish_matrix.shape) if path[maze.start_pos] == 0: fitness -= 10000 if path[maze.end_pos] == 0: fitness -= 10000 if path[maze.start_pos] == 1 and path[maze.end_pos] == 1: fitness += 100 if maze.ga_iteration >= 4000 and maze.shortest_path == []: critical = True else: critical = False # Check if there is a valid path complete_path = maze.walk_through_maze(path, critical_situation=critical) complete_path_len = len(complete_path) # Set the first path found as the shotest one if maze.shortest_path == [] and complete_path_len > 0: maze.adjust_weights(complete_path) print('First path found') maze.shortest_path = complete_path # Check if the current path is shorter than the shortest one elif complete_path_len != 0 and complete_path_len < len(maze.shortest_path): print('Found a better path') maze.shortest_path = complete_path maze.adjust_weights(complete_path) maze.ga_iteration += 1 return fitness class Maze: def __init__(self, maze, start_pos, end_pos, punish_matrix, logfile, shortest_path): self.maze = maze self.start_pos = start_pos self.end_pos = end_pos self.punish_matrix = punish_matrix self.shortest_path = shortest_path self.ga_iteration = 0 self.start_time = 0 self.end_time = 0 self.logfile = logfile def run_genetic_algorithm(self): # Set global punish matrix punish_matrix = self.punish_matrix maze = self.maze self.start_time = time.time() ga_instance = pygad.GA(num_genes=punish_matrix.size, num_generations=1000, sol_per_pop=20, num_parents_mating=15, gene_type=int, crossover_type="two_points", fitness_func=fitness_func, parent_selection_type="tournament", keep_parents=-1, allow_duplicate_genes=True, parallel_processing=4, gene_space=[0, 1]) ga_instance.run() self.end_time = time.time() solution, solution_fitness, solution_idx = ga_instance.best_solution() self.log_experiment() #print("The shortest path is", self.shortest_path, self.ga_iteration) #self.print_shortest_path() def walk_through_maze(self, solution_matrix, critical_situation): queue = [[self.start_pos]] def add_to_queue(full_path, x, y): if (x,y) not in full_path: full_path = full_path.copy() full_path.append((x, y)) queue.append(full_path) while queue != []: full_path = queue.pop() x, y = full_path[-1] if(self.maze[x][y] == 'E'): return full_path if x + 1 < len(self.maze) : if solution_matrix[x+1, y] == 1 and (critical_situation or (self.maze[x+1][y] == "." or self.maze[x+1][y] == "E")): add_to_queue(full_path, x+1, y) if x - 1 >= 0: if solution_matrix[x-1, y] == 1 and (critical_situation or (self.maze[x-1][y] == "." or self.maze[x-1][y] == "E")): add_to_queue(full_path, x-1, y) if y + 1 < len(self.maze) : if solution_matrix[x, y+1] == 1 and (critical_situation or(self.maze[x][y+1] == "." or self.maze[x][y+1] == "E")): add_to_queue(full_path, x, y+1) if y - 1 >= 0: if solution_matrix[x, y-1] == 1 and (critical_situation or (self.maze[x][y-1] == "." or self.maze[x][y-1] == "E")): add_to_queue(full_path, x, y-1) return [] def adjust_weights(self, found_path): for (x, y) in found_path: self.punish_matrix[x,y] += 700 def print_maze(self): for row in self.maze: print(' '.join(row)) def print_shortest_path(self): for (x, y) in self.shortest_path: lst = list(self.maze[x]) lst[y] = 'X' self.maze[x] = ''.join(lst) self.print_maze() def log_experiment(self): with open(self.logfile, 'a') as f: f.write(str(maze_ix) +',') f.write(str(self.end_time - self.start_time) +',') f.write(str(self.shortest_path)) f.write('\n') f.flush() f.close() def read_mazes(): with open('./mazes_classic.txt', 'r') as f: mazes = [] maze = [] for line in f: if line == '\n': mazes.append(maze) maze = [] continue maze.append(line.strip()) return mazes def prepare_maze(maze_ix, mazes, LOGFILE): maze = mazes[maze_ix] punish_matrix = np.zeros((len(maze), len(maze)), dtype=np.int64) start_index = 0, 0 end_index = 0, 0 treasures = [] # Initialize punish matrix and find start and end index for i, x in enumerate(maze): for j, y in enumerate(x): if y == "#": punish_matrix[i, j] = -1000 if y == ".": punish_matrix[i, j] = +700 if y == "S": start_index = i, j if y == "E": end_index = i, j if y == "T": treasures.append((i, j)) # Create maze class maze = Maze(maze, start_index, end_index, punish_matrix, LOGFILE, []) return maze def prepare_log(LOGFILE): with open(LOGFILE, 'a') as f: f.write('Maze,Time,Shortest path') f.write('\n') f.flush() f.close() def main(): # Read mazes global maze_ix, mazes mazes = [] text_mazes = read_mazes() LOGFILE = 'log_t1_classic.txt' prepare_log(LOGFILE) for i in range(len(text_mazes)): print('MAZE: ', i) maze_ix = i maze = prepare_maze(i, text_mazes, LOGFILE) mazes.append(maze) maze.run_genetic_algorithm() if __name__ == "__main__": main()