import pygad import numpy as np import random # Create a maze class global maze_ix def fitness_func(path, solution_idx): maze = mazes[maze_ix] fitness = np.sum(path * maze.punish_matrix.reshape(-1)) path = path.reshape(maze.punish_matrix.shape) if path[maze.start_pos] == 0: fitness -= 10000 if path[maze.end_pos] == 0: fitness -= 10000 if path[maze.start_pos] == 1 and path[maze.end_pos] == 1: fitness += 300 # Check if there is a valid path complete_path = maze.walk_through_maze(path) complete_path_len = len(complete_path) # Check how many treasures are found treasures_found = 0 for move in complete_path: if move in maze.treasures: treasures_found += 1 fitness += treasures_found * 1000 # Set the first path found as the shotest one if maze.shortest_path == [] and complete_path_len > 0 and treasures_found >= len(maze.treasures) // 2: maze.adjust_weights(complete_path) print('First path found') fitness += 1000 * len(maze.treasures) maze.shortest_path = complete_path #Check if the current path is shorter than the shortest one if complete_path_len != 0 and complete_path_len < len(maze.shortest_path) and treasures_found >= len(maze.treasures) // 2: print('Found a better path') fitness += 1000 * len(maze.treasures) maze.shortest_path = complete_path maze.adjust_weights(complete_path) maze.ga_iteration += 1 return fitness def on_mutation(generations, ga_instance): maze = mazes[maze_ix] no_wall_instances = np.where(maze.mutation_matrix.reshape(-1) == 1)[0] wall_instances = np.where(maze.mutation_matrix.reshape(-1) == 0)[0] treasure_instances = np.where(maze.mutation_matrix.reshape(-1) == 2)[0] cluster_instances = np.reshape(np.array(maze.clusters), -1) # Loop through the population for i in range(len(generations)): # randomly select random number of the instances where there are walls random_false_instances = np.random.choice(wall_instances, size=int(len(wall_instances)* random.uniform(0.01, 1.0)), replace=False) # randomly select random number of the instances where there are no walls random_true_instances = np.random.choice(no_wall_instances, size=int(len(no_wall_instances)* random.uniform(0.01, 1.0)), replace=False) # randomly select random number of the instances where there are treasures random_treasure_instances = np.random.choice(treasure_instances, size=int(len(treasure_instances)* random.uniform(0.01, 1.0)), replace=False) # randomly select random number of the instances where there are clusters random_cluster_instances = np.random.choice(cluster_instances, size=int(len(cluster_instances)* random.uniform(0.01, 1.0)), replace=False) generations[i][random_true_instances] = 1 generations[i][random_false_instances] = 0 generations[i][random_treasure_instances] = 1 generations[i][random_cluster_instances] = 1 return generations class Maze: def __init__(self, maze, start_pos, end_pos, punish_matrix, mutation_matrix, treasures, shortest_path): self.maze = maze self.start_pos = start_pos self.end_pos = end_pos self.punish_matrix = punish_matrix self.mutation_matrix = mutation_matrix self.treasures = treasures self.shortest_path = shortest_path self.ga_iteration = 0 self.initial_population_size = 1000 self.clusters = [] def run_genetic_algorithm(self): # Set global punish matrix punish_matrix = self.punish_matrix # Prepare treasure clusters self.locate_treasure_clusters() ga_instance = pygad.GA(num_genes=punish_matrix.size, num_generations=50, sol_per_pop=self.initial_population_size, num_parents_mating=200, gene_type=np.uint8, fitness_func=fitness_func, parent_selection_type="random", keep_parents=2, allow_duplicate_genes=True, parallel_processing=1, mutation_type=on_mutation, initial_population=self.generate_initial_population(), gene_space=[0, 1]) ga_instance.run() solution, solution_fitness, solution_idx = ga_instance.best_solution() print("The shortest path is", self.shortest_path, self.ga_iteration) print("The best solution is", solution.reshape(self.punish_matrix.shape)) self.print_shortest_path() def walk_through_maze(self, solution_matrix): queue = [[self.start_pos]] def add_to_queue(full_path, x, y): if (x,y) not in full_path: full_path = full_path.copy() full_path.append((x, y)) queue.append(full_path) def is_valid_move(x, y): return self.maze[x][y] == "." or self.maze[x][y] == "E" or self.maze[x][y] == "T" while queue != []: full_path = queue.pop() x, y = full_path[-1] if(self.maze[x][y] == 'E'): return full_path if x + 1 < len(self.maze) : if solution_matrix[x+1, y] == 1 and is_valid_move(x+1, y): add_to_queue(full_path, x+1, y) if x - 1 >= 0: if solution_matrix[x-1, y] == 1 and is_valid_move(x-1, y): add_to_queue(full_path, x-1, y) if y + 1 < len(self.maze) : if solution_matrix[x, y+1] == 1 and is_valid_move(x, y+1): add_to_queue(full_path, x, y+1) if y - 1 >= 0: if solution_matrix[x, y-1] == 1 and is_valid_move(x, y-1): add_to_queue(full_path, x, y-1) return [] def adjust_weights(self, found_path): for (x, y) in found_path: self.punish_matrix[x,y] += 100 def print_maze(self): for row in self.maze: print(' '.join(row)) def print_shortest_path(self): for (x, y) in self.shortest_path: if (x, y) == self.start_pos or (x, y) == self.end_pos: continue if (x, y) in self.treasures: continue lst = list(self.maze[x]) lst[y] = 'X' self.maze[x] = ''.join(lst) self.print_maze() def generate_initial_population(self): # Generate initial population # Firtly find the instances where there are no walls no_wall_instances = np.where(self.mutation_matrix.reshape(-1) == 1)[0] wall_instances = np.where(self.mutation_matrix.reshape(-1) == 0)[0] treasure_instances = np.where(self.mutation_matrix.reshape(-1) == 2)[0] cluster_instances = np.reshape(np.array(self.clusters), -1) initial_population = np.random.choice([0, 1], size=(self.initial_population_size, self.mutation_matrix.size)) for population in initial_population: # select random number of the instances where there are walls random_false_instances = np.random.choice(wall_instances, size=int(len(no_wall_instances)* random.uniform(0.5, 1.0)), replace=False) # Randomly select random number of the instances where there are no walls random_true_instances = np.random.choice(no_wall_instances, size=int(len(no_wall_instances)* random.uniform(0.1, 1.0)), replace=False) # Randomly select treasure instances random_treasure_instances = np.random.choice(treasure_instances, size=int(len(treasure_instances)* random.uniform(0.1, 1.0)), replace=False) # Randomly select cluster instances random_cluster_instances = np.random.choice(cluster_instances, size=int(len(cluster_instances)* random.uniform(0.1, 1.0)), replace=False) # Then apply those values to generation population[random_true_instances] = 1 population[random_false_instances] = 0 population[random_treasure_instances] = 1 population[random_cluster_instances] = 1 return initial_population def locate_treasure_clusters(self): # Find treasoure neighbours max_cluster_size = int(self.mutation_matrix.shape[0] * 4) clusters = [] for treasure in self.treasures: queue = [[treasure]] # Define add to queue function def add_to_queue(cluster, x, y): if (x,y) not in cluster: cluster = cluster.copy() cluster.append((x, y)) queue.append(cluster) # Deine valid move function def is_valid_move(x, y): return self.maze[x][y] == "." or self.maze[x][y] == "E" or self.maze[x][y] == "T" or self.maze[x][y] == "S" while queue != []: current_cluster = queue.pop() x, y = current_cluster[-1] # Add cluster to clusters if we have found a big enough one if len(current_cluster) >= max_cluster_size or (x, y) == self.end_pos or (x, y) == self.start_pos: clusters.append(current_cluster.copy()) continue # Add neighbours to cluster if x + 1 < len(self.maze) : if is_valid_move(x+1, y): add_to_queue(current_cluster, x+1, y) if x - 1 >= 0: if is_valid_move(x-1, y): add_to_queue(current_cluster, x-1, y) if y + 1 < len(self.maze) : if is_valid_move(x, y+1): add_to_queue(current_cluster, x, y+1) if y - 1 >= 0: if is_valid_move(x, y-1): add_to_queue(current_cluster, x, y-1) # Now prepare clusters for mutation mutation_clusters = clusters.copy() print(clusters) for i in range(len(clusters)): for j, (x, y) in enumerate(clusters[i]): mutation_clusters[i][j] = x * self.mutation_matrix.shape[0] + y # Convert to numpy array mut_clusters_np_array = [] for i in range(len(mutation_clusters)): for j in range(len(mutation_clusters[i])): mut_clusters_np_array.append(int(mutation_clusters[i][j])) mut_clusters_np_array = np.array(mut_clusters_np_array) self.clusters = mut_clusters_np_array def read_mazes(): with open('./mazes_treasures.txt', 'r') as f: mazes = [] maze = [] for line in f: if line == '\n': mazes.append(maze) maze = [] continue maze.append(line.strip()) return mazes def prepare_maze(maze_ix, mazes): maze = mazes[maze_ix] punish_matrix = np.zeros((len(maze), len(maze)), dtype=np.int64) mutation_matrix = np.zeros((len(maze), len(maze)), dtype=np.uint8) start_index = 0, 0 end_index = 0, 0 treasures = [] # Initialize punish matrix and find start and end index for i, x in enumerate(maze): for j, y in enumerate(x): if y == "#": punish_matrix[i, j] = -1000 mutation_matrix[i, j] = 0 if y == ".": punish_matrix[i, j] = +1000 mutation_matrix[i, j] = 1 if y == "S": start_index = i, j mutation_matrix[i, j] = 1 if y == "E": end_index = i, j mutation_matrix[i, j] = 1 if y == "T": punish_matrix[i, j] = +20000 mutation_matrix[i, j] = 2 treasures.append((i, j)) # Create maze class maze = Maze(maze, start_index, end_index, punish_matrix, mutation_matrix, treasures, []) return maze def main(): # Read mazes global maze_ix, mazes mazes = [] text_mazes = read_mazes() for i in range(len(text_mazes)): print('MAZE: ', i) maze_ix = i maze = prepare_maze(i, text_mazes) mazes.append(maze) maze.run_genetic_algorithm() if __name__ == "__main__": main()