burtenshaw HF Staff commited on
Commit
be46a16
·
verified ·
1 Parent(s): 29a7cec

Upload folder using huggingface_hub

Browse files
src/envs/maze_env/client.py CHANGED
@@ -23,10 +23,17 @@ from .models import MazeAction, MazeObservation, MazeState
23
  if TYPE_CHECKING:
24
  pass
25
 
 
26
  class MazeEnv(HTTPEnvClient[MazeAction, MazeObservation]):
27
  """HTTP client for Maze Environment."""
28
 
29
- def render_ascii_maze(self, maze: List[List[int]], position: List[int], start: List[int], goal: List[int]) -> None:
 
 
 
 
 
 
30
  """
31
  Render the maze grid as ASCII art in the terminal.
32
  - 0 = free cell
@@ -49,8 +56,8 @@ class MazeEnv(HTTPEnvClient[MazeAction, MazeObservation]):
49
  line += "G "
50
  elif maze[r][c] == 1:
51
  line += "█ "
52
- elif r == rows-1 and c == cols-1:
53
- line+= "E "
54
  else:
55
  line += ". "
56
  print(line)
@@ -82,4 +89,4 @@ class MazeEnv(HTTPEnvClient[MazeAction, MazeObservation]):
82
  episode_id=payload.get("episode_id", ""),
83
  step_count=payload.get("step_count", 0),
84
  done=payload.get("done", False),
85
- )
 
23
  if TYPE_CHECKING:
24
  pass
25
 
26
+
27
  class MazeEnv(HTTPEnvClient[MazeAction, MazeObservation]):
28
  """HTTP client for Maze Environment."""
29
 
30
+ def render_ascii_maze(
31
+ self,
32
+ maze: List[List[int]],
33
+ position: List[int],
34
+ start: List[int],
35
+ goal: List[int],
36
+ ) -> None:
37
  """
38
  Render the maze grid as ASCII art in the terminal.
39
  - 0 = free cell
 
56
  line += "G "
57
  elif maze[r][c] == 1:
58
  line += "█ "
59
+ elif r == rows - 1 and c == cols - 1:
60
+ line += "E "
61
  else:
62
  line += ". "
63
  print(line)
 
89
  episode_id=payload.get("episode_id", ""),
90
  step_count=payload.get("step_count", 0),
91
  done=payload.get("done", False),
92
+ )
src/envs/maze_env/models.py CHANGED
@@ -29,6 +29,7 @@ class MazeObservation(Observation):
29
  total_reward: float
30
  legal_actions: List[int] = field(default_factory=list)
31
 
 
32
  @dataclass
33
  class MazeState(State):
34
  episode_id: str
 
29
  total_reward: float
30
  legal_actions: List[int] = field(default_factory=list)
31
 
32
+
33
  @dataclass
34
  class MazeState(State):
35
  episode_id: str
src/envs/maze_env/server/__init__.py CHANGED
@@ -8,4 +8,4 @@
8
  from .maze import Maze, Status
9
  from .maze_environment import MazeEnvironment
10
 
11
- __all__ = ["Maze","MazeEnvironment","Status"]
 
8
  from .maze import Maze, Status
9
  from .maze_environment import MazeEnvironment
10
 
11
+ __all__ = ["Maze", "MazeEnvironment", "Status"]
src/envs/maze_env/server/app.py CHANGED
@@ -28,10 +28,11 @@ from core.env_server import create_app
28
  from ..models import MazeAction, MazeObservation
29
  from .maze_environment import MazeEnvironment
30
  from .mazearray import maze
 
31
  # Get game configuration from environment variables
32
 
33
  # Create the environment instance
34
- env = MazeEnvironment(maze_array=maze,start_cell=(0,0),exit_cell=(7,7))
35
 
36
  # Create the FastAPI app with web interface and README integration
37
  app = create_app(env, MazeAction, MazeObservation, env_name="maze_env")
 
28
  from ..models import MazeAction, MazeObservation
29
  from .maze_environment import MazeEnvironment
30
  from .mazearray import maze
31
+
32
  # Get game configuration from environment variables
33
 
34
  # Create the environment instance
35
+ env = MazeEnvironment(maze_array=maze, start_cell=(0, 0), exit_cell=(7, 7))
36
 
37
  # Create the FastAPI app with web interface and README integration
38
  app = create_app(env, MazeAction, MazeObservation, env_name="maze_env")
src/envs/maze_env/server/maze.py CHANGED
@@ -39,52 +39,73 @@ class Status(Enum):
39
 
40
 
41
  class Maze:
42
- """ A maze with walls. An agent is placed at the start cell and must find the exit cell by moving through the maze.
43
-
44
- The layout of the maze and the rules how to move through it are called the environment. An agent is placed
45
- at start_cell. The agent chooses actions (move left/right/up/down) in order to reach the exit_cell. Every
46
- action results in a reward or penalty which are accumulated during the game. Every move gives a small
47
- penalty (-0.05), returning to a cell the agent visited earlier a bigger penalty (-0.25) and running into
48
- a wall a large penalty (-0.75). The reward (+10.0) is collected when the agent reaches the exit. The
49
- game always reaches a terminal state; the agent either wins or looses. Obviously reaching the exit means
50
- winning, but if the penalties the agent is collecting during play exceed a certain threshold the agent is
51
- assumed to wander around clueless and looses.
52
-
53
- A note on cell coordinates:
54
- The cells in the maze are stored as (col, row) or (x, y) tuples. (0, 0) is the upper left corner of the maze.
55
- This way of storing coordinates is in line with what matplotlib's plot() function expects as inputs. The maze
56
- itself is stored as a 2D numpy array so cells are accessed via [row, col]. To convert a (col, row) tuple
57
- to (row, col) use (col, row)[::-1]
58
  """
59
- actions = [Action.MOVE_LEFT, Action.MOVE_RIGHT, Action.MOVE_UP, Action.MOVE_DOWN] # all possible actions
 
 
 
 
 
 
60
 
61
  reward_exit = 10.0 # reward for reaching the exit cell
62
- penalty_move = -0.05 # penalty for a move which did not result in finding the exit cell
 
 
63
  penalty_visited = -0.25 # penalty for returning to a cell which was visited earlier
64
- penalty_impossible_move = -0.75 # penalty for trying to enter an occupied cell or moving out of the maze
 
 
65
 
66
  def __init__(self, maze, start_cell=(0, 0), exit_cell=None):
67
- """ Create a new maze game.
68
 
69
- :param numpy.array maze: 2D array containing empty cells (= 0) and cells occupied with walls (= 1)
70
- :param tuple start_cell: starting cell for the agent in the maze (optional, else upper left)
71
- :param tuple exit_cell: exit cell which the agent has to reach (optional, else lower right)
72
  """
73
  self.maze = maze
74
 
75
- self.__minimum_reward = -0.5 * self.maze.size # stop game if accumulated reward is below this threshold
 
 
76
 
77
  nrows, ncols = self.maze.shape
78
  self.cells = [(col, row) for col in range(ncols) for row in range(nrows)]
79
- self.empty = [(col, row) for col in range(ncols) for row in range(nrows) if self.maze[row, col] == Cell.EMPTY]
 
 
 
 
 
80
  self.__exit_cell = (ncols - 1, nrows - 1) if exit_cell is None else exit_cell
81
  self.empty.remove(self.__exit_cell)
82
 
83
  # Check for impossible maze layout
84
  if self.__exit_cell not in self.cells:
85
- raise Exception("Error: exit cell at {} is not inside maze".format(self.__exit_cell))
 
 
86
  if self.maze[self.__exit_cell[::-1]] == Cell.OCCUPIED:
87
- raise Exception("Error: exit cell at {} is not free".format(self.__exit_cell))
 
 
88
 
89
  # Variables for rendering using Matplotlib
90
  self.__render = Render.NOTHING # what to render
@@ -94,17 +115,21 @@ class Maze:
94
  self.reset(start_cell)
95
 
96
  def reset(self, start_cell=(0, 0)):
97
- """ Reset the maze to its initial state and place the agent at start_cell.
98
 
99
- :param tuple start_cell: here the agent starts its journey through the maze (optional, else upper left)
100
- :return: new state after reset
101
  """
102
  if start_cell not in self.cells:
103
- raise Exception("Error: start cell at {} is not inside maze".format(start_cell))
 
 
104
  if self.maze[start_cell[::-1]] == Cell.OCCUPIED:
105
  raise Exception("Error: start cell at {} is not free".format(start_cell))
106
  if start_cell == self.__exit_cell:
107
- raise Exception("Error: start- and exit cell cannot be the same {}".format(start_cell))
 
 
108
 
109
  self.__previous_cell = self.__current_cell = start_cell
110
  self.__total_reward = 0.0 # accumulated reward
@@ -119,10 +144,18 @@ class Maze:
119
  self.__ax1.set_yticks(np.arange(0.5, ncols, step=1))
120
  self.__ax1.set_yticklabels([])
121
  self.__ax1.grid(True)
122
- self.__ax1.plot(*self.__current_cell, "rs", markersize=30) # start is a big red square
123
- self.__ax1.text(*self.__current_cell, "Start", ha="center", va="center", color="white")
124
- self.__ax1.plot(*self.__exit_cell, "gs", markersize=30) # exit is a big green square
125
- self.__ax1.text(*self.__exit_cell, "Exit", ha="center", va="center", color="white")
 
 
 
 
 
 
 
 
126
  self.__ax1.imshow(self.maze, cmap="binary")
127
  self.__ax1.get_figure().canvas.draw()
128
  self.__ax1.get_figure().canvas.flush_events()
@@ -130,35 +163,43 @@ class Maze:
130
  return self.__observe()
131
 
132
  def __draw(self):
133
- """ Draw a line from the agents previous cell to its current cell. """
134
- self.__ax1.plot(*zip(*[self.__previous_cell, self.__current_cell]), "bo-") # previous cells are blue dots
 
 
135
  self.__ax1.plot(*self.__current_cell, "ro") # current cell is a red dot
136
  self.__ax1.get_figure().canvas.draw()
137
  self.__ax1.get_figure().canvas.flush_events()
138
 
139
  def step(self, action):
140
- """ Move the agent according to 'action' and return the new state, reward and game status.
141
 
142
- :param Action action: the agent will move in this direction
143
- :return: state, reward, status
144
  """
145
  reward = self.__execute(action)
146
  self.__total_reward += reward
147
  status = self.__status()
148
  state = self.__observe()
149
- logging.debug("action: {:10s} | reward: {: .2f} | status: {}".format(Action(action).name, reward, status))
 
 
 
 
150
  return state, reward, status
151
 
152
  def __execute(self, action):
153
- """ Execute action and collect the reward or penalty.
154
 
155
- :param Action action: direction in which the agent will move
156
- :return float: reward or penalty which results from the action
157
  """
158
  possible_actions = self.__possible_actions(self.__current_cell)
159
 
160
  if not possible_actions:
161
- reward = self.__minimum_reward - 1 # cannot move anywhere, force end of game
 
 
162
  elif action in possible_actions:
163
  col, row = self.__current_cell
164
  if action == Action.MOVE_LEFT:
@@ -179,21 +220,27 @@ class Maze:
179
  if self.__current_cell == self.__exit_cell:
180
  reward = Maze.reward_exit # maximum reward when reaching the exit cell
181
  elif self.__current_cell in self.__visited:
182
- reward = Maze.penalty_visited # penalty when returning to a cell which was visited earlier
 
 
183
  else:
184
- reward = Maze.penalty_move # penalty for a move which did not result in finding the exit cell
 
 
185
 
186
  self.__visited.add(self.__current_cell)
187
  else:
188
- reward = Maze.penalty_impossible_move # penalty for trying to enter an occupied cell or move out of the maze
 
 
189
 
190
  return reward
191
 
192
  def __possible_actions(self, cell=None):
193
- """ Create a list with all possible actions from 'cell', avoiding the maze's edges and walls.
194
 
195
- :param tuple cell: location of the agent (optional, else use current cell)
196
- :return list: all possible actions
197
  """
198
  if cell is None:
199
  col, row = self.__current_cell
@@ -206,42 +253,48 @@ class Maze:
206
  nrows, ncols = self.maze.shape
207
  if row == 0 or (row > 0 and self.maze[row - 1, col] == Cell.OCCUPIED):
208
  possible_actions.remove(Action.MOVE_UP)
209
- if row == nrows - 1 or (row < nrows - 1 and self.maze[row + 1, col] == Cell.OCCUPIED):
 
 
210
  possible_actions.remove(Action.MOVE_DOWN)
211
 
212
  if col == 0 or (col > 0 and self.maze[row, col - 1] == Cell.OCCUPIED):
213
  possible_actions.remove(Action.MOVE_LEFT)
214
- if col == ncols - 1 or (col < ncols - 1 and self.maze[row, col + 1] == Cell.OCCUPIED):
 
 
215
  possible_actions.remove(Action.MOVE_RIGHT)
216
 
217
  return possible_actions
218
 
219
  def __status(self):
220
- """ Return the game status.
221
 
222
- :return Status: current game status (WIN, LOSE, PLAYING)
223
  """
224
  if self.__current_cell == self.__exit_cell:
225
  return Status.WIN
226
 
227
- if self.__total_reward < self.__minimum_reward: # force end of game after too much loss
 
 
228
  return Status.LOSE
229
 
230
  return Status.PLAYING
231
 
232
  def __observe(self):
233
- """ Return the state of the maze - in this game the agents current location.
234
 
235
- :return numpy.array [1][2]: agents current location
236
  """
237
  return np.array([[*self.__current_cell]])
238
 
239
  def play(self, model, start_cell=(0, 0)):
240
- """ Play a single game, choosing the next move based a prediction from 'model'.
241
 
242
- :param class AbstractModel model: the prediction model to use
243
- :param tuple start_cell: agents initial cell (optional, else upper left)
244
- :return Status: WIN, LOSE
245
  """
246
  self.reset(start_cell)
247
 
@@ -254,9 +307,11 @@ class Maze:
254
  return status
255
 
256
  def check_win_all(self, model):
257
- """ Check if the model wins from all possible starting cells. """
258
  previous = self.__render
259
- self.__render = Render.NOTHING # avoid rendering anything during execution of the check games
 
 
260
 
261
  win = 0
262
  lose = 0
@@ -269,14 +324,18 @@ class Maze:
269
 
270
  self.__render = previous # restore previous rendering setting
271
 
272
- logging.info("won: {} | lost: {} | win rate: {:.5f}".format(win, lose, win / (win + lose)))
 
 
 
 
273
 
274
  result = True if lose == 0 else False
275
 
276
  return result, win / (win + lose)
277
 
278
  def render_q(self, model):
279
- """ Render the recommended action(s) for each cell as provided by 'model'.
280
 
281
  :param class AbstractModel model: the prediction model to use
282
  """
@@ -293,8 +352,12 @@ class Maze:
293
  self.__ax2.set_yticks(np.arange(0.5, ncols, step=1))
294
  self.__ax2.set_yticklabels([])
295
  self.__ax2.grid(True)
296
- self.__ax2.plot(*self.__exit_cell, "gs", markersize=30) # exit is a big green square
297
- self.__ax2.text(*self.__exit_cell, "Exit", ha="center", va="center", color="white")
 
 
 
 
298
 
299
  for cell in self.empty:
300
  q = model.q(cell) if model is not None else [0, 0, 0, 0]
@@ -315,9 +378,18 @@ class Maze:
315
  # color (from red to green) represents the certainty of the preferred action(s)
316
  maxv = 1
317
  minv = -1
318
- color = clip((q[action] - minv) / (maxv - minv)) # normalize in [-1, 1]
319
-
320
- self.__ax2.arrow(*cell, dx, dy, color=(1 - color, color, 0), head_width=0.2, head_length=0.1)
 
 
 
 
 
 
 
 
 
321
 
322
  self.__ax2.imshow(self.maze, cmap="binary")
323
- self.__ax2.get_figure().canvas.draw()
 
39
 
40
 
41
  class Maze:
42
+ """A maze with walls. An agent is placed at the start cell and must find the exit cell by moving through the maze.
43
+
44
+ The layout of the maze and the rules how to move through it are called the environment. An agent is placed
45
+ at start_cell. The agent chooses actions (move left/right/up/down) in order to reach the exit_cell. Every
46
+ action results in a reward or penalty which are accumulated during the game. Every move gives a small
47
+ penalty (-0.05), returning to a cell the agent visited earlier a bigger penalty (-0.25) and running into
48
+ a wall a large penalty (-0.75). The reward (+10.0) is collected when the agent reaches the exit. The
49
+ game always reaches a terminal state; the agent either wins or looses. Obviously reaching the exit means
50
+ winning, but if the penalties the agent is collecting during play exceed a certain threshold the agent is
51
+ assumed to wander around clueless and looses.
52
+
53
+ A note on cell coordinates:
54
+ The cells in the maze are stored as (col, row) or (x, y) tuples. (0, 0) is the upper left corner of the maze.
55
+ This way of storing coordinates is in line with what matplotlib's plot() function expects as inputs. The maze
56
+ itself is stored as a 2D numpy array so cells are accessed via [row, col]. To convert a (col, row) tuple
57
+ to (row, col) use (col, row)[::-1]
58
  """
59
+
60
+ actions = [
61
+ Action.MOVE_LEFT,
62
+ Action.MOVE_RIGHT,
63
+ Action.MOVE_UP,
64
+ Action.MOVE_DOWN,
65
+ ] # all possible actions
66
 
67
  reward_exit = 10.0 # reward for reaching the exit cell
68
+ penalty_move = (
69
+ -0.05
70
+ ) # penalty for a move which did not result in finding the exit cell
71
  penalty_visited = -0.25 # penalty for returning to a cell which was visited earlier
72
+ penalty_impossible_move = (
73
+ -0.75
74
+ ) # penalty for trying to enter an occupied cell or moving out of the maze
75
 
76
  def __init__(self, maze, start_cell=(0, 0), exit_cell=None):
77
+ """Create a new maze game.
78
 
79
+ :param numpy.array maze: 2D array containing empty cells (= 0) and cells occupied with walls (= 1)
80
+ :param tuple start_cell: starting cell for the agent in the maze (optional, else upper left)
81
+ :param tuple exit_cell: exit cell which the agent has to reach (optional, else lower right)
82
  """
83
  self.maze = maze
84
 
85
+ self.__minimum_reward = (
86
+ -0.5 * self.maze.size
87
+ ) # stop game if accumulated reward is below this threshold
88
 
89
  nrows, ncols = self.maze.shape
90
  self.cells = [(col, row) for col in range(ncols) for row in range(nrows)]
91
+ self.empty = [
92
+ (col, row)
93
+ for col in range(ncols)
94
+ for row in range(nrows)
95
+ if self.maze[row, col] == Cell.EMPTY
96
+ ]
97
  self.__exit_cell = (ncols - 1, nrows - 1) if exit_cell is None else exit_cell
98
  self.empty.remove(self.__exit_cell)
99
 
100
  # Check for impossible maze layout
101
  if self.__exit_cell not in self.cells:
102
+ raise Exception(
103
+ "Error: exit cell at {} is not inside maze".format(self.__exit_cell)
104
+ )
105
  if self.maze[self.__exit_cell[::-1]] == Cell.OCCUPIED:
106
+ raise Exception(
107
+ "Error: exit cell at {} is not free".format(self.__exit_cell)
108
+ )
109
 
110
  # Variables for rendering using Matplotlib
111
  self.__render = Render.NOTHING # what to render
 
115
  self.reset(start_cell)
116
 
117
  def reset(self, start_cell=(0, 0)):
118
+ """Reset the maze to its initial state and place the agent at start_cell.
119
 
120
+ :param tuple start_cell: here the agent starts its journey through the maze (optional, else upper left)
121
+ :return: new state after reset
122
  """
123
  if start_cell not in self.cells:
124
+ raise Exception(
125
+ "Error: start cell at {} is not inside maze".format(start_cell)
126
+ )
127
  if self.maze[start_cell[::-1]] == Cell.OCCUPIED:
128
  raise Exception("Error: start cell at {} is not free".format(start_cell))
129
  if start_cell == self.__exit_cell:
130
+ raise Exception(
131
+ "Error: start- and exit cell cannot be the same {}".format(start_cell)
132
+ )
133
 
134
  self.__previous_cell = self.__current_cell = start_cell
135
  self.__total_reward = 0.0 # accumulated reward
 
144
  self.__ax1.set_yticks(np.arange(0.5, ncols, step=1))
145
  self.__ax1.set_yticklabels([])
146
  self.__ax1.grid(True)
147
+ self.__ax1.plot(
148
+ *self.__current_cell, "rs", markersize=30
149
+ ) # start is a big red square
150
+ self.__ax1.text(
151
+ *self.__current_cell, "Start", ha="center", va="center", color="white"
152
+ )
153
+ self.__ax1.plot(
154
+ *self.__exit_cell, "gs", markersize=30
155
+ ) # exit is a big green square
156
+ self.__ax1.text(
157
+ *self.__exit_cell, "Exit", ha="center", va="center", color="white"
158
+ )
159
  self.__ax1.imshow(self.maze, cmap="binary")
160
  self.__ax1.get_figure().canvas.draw()
161
  self.__ax1.get_figure().canvas.flush_events()
 
163
  return self.__observe()
164
 
165
  def __draw(self):
166
+ """Draw a line from the agents previous cell to its current cell."""
167
+ self.__ax1.plot(
168
+ *zip(*[self.__previous_cell, self.__current_cell]), "bo-"
169
+ ) # previous cells are blue dots
170
  self.__ax1.plot(*self.__current_cell, "ro") # current cell is a red dot
171
  self.__ax1.get_figure().canvas.draw()
172
  self.__ax1.get_figure().canvas.flush_events()
173
 
174
  def step(self, action):
175
+ """Move the agent according to 'action' and return the new state, reward and game status.
176
 
177
+ :param Action action: the agent will move in this direction
178
+ :return: state, reward, status
179
  """
180
  reward = self.__execute(action)
181
  self.__total_reward += reward
182
  status = self.__status()
183
  state = self.__observe()
184
+ logging.debug(
185
+ "action: {:10s} | reward: {: .2f} | status: {}".format(
186
+ Action(action).name, reward, status
187
+ )
188
+ )
189
  return state, reward, status
190
 
191
  def __execute(self, action):
192
+ """Execute action and collect the reward or penalty.
193
 
194
+ :param Action action: direction in which the agent will move
195
+ :return float: reward or penalty which results from the action
196
  """
197
  possible_actions = self.__possible_actions(self.__current_cell)
198
 
199
  if not possible_actions:
200
+ reward = (
201
+ self.__minimum_reward - 1
202
+ ) # cannot move anywhere, force end of game
203
  elif action in possible_actions:
204
  col, row = self.__current_cell
205
  if action == Action.MOVE_LEFT:
 
220
  if self.__current_cell == self.__exit_cell:
221
  reward = Maze.reward_exit # maximum reward when reaching the exit cell
222
  elif self.__current_cell in self.__visited:
223
+ reward = (
224
+ Maze.penalty_visited
225
+ ) # penalty when returning to a cell which was visited earlier
226
  else:
227
+ reward = (
228
+ Maze.penalty_move
229
+ ) # penalty for a move which did not result in finding the exit cell
230
 
231
  self.__visited.add(self.__current_cell)
232
  else:
233
+ reward = (
234
+ Maze.penalty_impossible_move
235
+ ) # penalty for trying to enter an occupied cell or move out of the maze
236
 
237
  return reward
238
 
239
  def __possible_actions(self, cell=None):
240
+ """Create a list with all possible actions from 'cell', avoiding the maze's edges and walls.
241
 
242
+ :param tuple cell: location of the agent (optional, else use current cell)
243
+ :return list: all possible actions
244
  """
245
  if cell is None:
246
  col, row = self.__current_cell
 
253
  nrows, ncols = self.maze.shape
254
  if row == 0 or (row > 0 and self.maze[row - 1, col] == Cell.OCCUPIED):
255
  possible_actions.remove(Action.MOVE_UP)
256
+ if row == nrows - 1 or (
257
+ row < nrows - 1 and self.maze[row + 1, col] == Cell.OCCUPIED
258
+ ):
259
  possible_actions.remove(Action.MOVE_DOWN)
260
 
261
  if col == 0 or (col > 0 and self.maze[row, col - 1] == Cell.OCCUPIED):
262
  possible_actions.remove(Action.MOVE_LEFT)
263
+ if col == ncols - 1 or (
264
+ col < ncols - 1 and self.maze[row, col + 1] == Cell.OCCUPIED
265
+ ):
266
  possible_actions.remove(Action.MOVE_RIGHT)
267
 
268
  return possible_actions
269
 
270
  def __status(self):
271
+ """Return the game status.
272
 
273
+ :return Status: current game status (WIN, LOSE, PLAYING)
274
  """
275
  if self.__current_cell == self.__exit_cell:
276
  return Status.WIN
277
 
278
+ if (
279
+ self.__total_reward < self.__minimum_reward
280
+ ): # force end of game after too much loss
281
  return Status.LOSE
282
 
283
  return Status.PLAYING
284
 
285
  def __observe(self):
286
+ """Return the state of the maze - in this game the agents current location.
287
 
288
+ :return numpy.array [1][2]: agents current location
289
  """
290
  return np.array([[*self.__current_cell]])
291
 
292
  def play(self, model, start_cell=(0, 0)):
293
+ """Play a single game, choosing the next move based a prediction from 'model'.
294
 
295
+ :param class AbstractModel model: the prediction model to use
296
+ :param tuple start_cell: agents initial cell (optional, else upper left)
297
+ :return Status: WIN, LOSE
298
  """
299
  self.reset(start_cell)
300
 
 
307
  return status
308
 
309
  def check_win_all(self, model):
310
+ """Check if the model wins from all possible starting cells."""
311
  previous = self.__render
312
+ self.__render = (
313
+ Render.NOTHING
314
+ ) # avoid rendering anything during execution of the check games
315
 
316
  win = 0
317
  lose = 0
 
324
 
325
  self.__render = previous # restore previous rendering setting
326
 
327
+ logging.info(
328
+ "won: {} | lost: {} | win rate: {:.5f}".format(
329
+ win, lose, win / (win + lose)
330
+ )
331
+ )
332
 
333
  result = True if lose == 0 else False
334
 
335
  return result, win / (win + lose)
336
 
337
  def render_q(self, model):
338
+ """Render the recommended action(s) for each cell as provided by 'model'.
339
 
340
  :param class AbstractModel model: the prediction model to use
341
  """
 
352
  self.__ax2.set_yticks(np.arange(0.5, ncols, step=1))
353
  self.__ax2.set_yticklabels([])
354
  self.__ax2.grid(True)
355
+ self.__ax2.plot(
356
+ *self.__exit_cell, "gs", markersize=30
357
+ ) # exit is a big green square
358
+ self.__ax2.text(
359
+ *self.__exit_cell, "Exit", ha="center", va="center", color="white"
360
+ )
361
 
362
  for cell in self.empty:
363
  q = model.q(cell) if model is not None else [0, 0, 0, 0]
 
378
  # color (from red to green) represents the certainty of the preferred action(s)
379
  maxv = 1
380
  minv = -1
381
+ color = clip(
382
+ (q[action] - minv) / (maxv - minv)
383
+ ) # normalize in [-1, 1]
384
+
385
+ self.__ax2.arrow(
386
+ *cell,
387
+ dx,
388
+ dy,
389
+ color=(1 - color, color, 0),
390
+ head_width=0.2,
391
+ head_length=0.1,
392
+ )
393
 
394
  self.__ax2.imshow(self.maze, cmap="binary")
395
+ self.__ax2.get_figure().canvas.draw()
src/envs/maze_env/server/maze_environment.py CHANGED
@@ -42,7 +42,7 @@ class MazeEnvironment(Environment):
42
  self,
43
  maze_array: np.ndarray,
44
  start_cell: Tuple[int, int] = (0, 0),
45
- exit_cell: Optional[Tuple[int, int]] = (7,7),
46
  ):
47
  # Create underlying Maze instance (matches your working code)
48
  self.env = Maze(maze=maze_array, start_cell=start_cell, exit_cell=exit_cell)
@@ -54,16 +54,26 @@ class MazeEnvironment(Environment):
54
 
55
  def reset(self) -> MazeObservation:
56
  """Reset environment and return initial observation (MazeObservation)."""
57
- observation = self.env.reset() # typically returns np.array([row, col]) or similar
 
 
58
  # initialize episode state
59
  self.state = MazeState(episode_id="episode_1", step_count=0, done=False)
60
 
61
  # build MazeObservation; convert numpy to list for JSON-serializable dataclass fields
62
- pos_list = observation.tolist() if hasattr(observation, "tolist") else list(observation)
 
 
 
 
63
  self.total_reward = 0
64
  legal_actions = self._compute_legal_actions(pos_list[0])
65
 
66
- return MazeObservation(position=pos_list, total_reward=self.total_reward, legal_actions=legal_actions)
 
 
 
 
67
 
68
  def step(self, action: MazeAction) -> MazeObservation:
69
  """
@@ -91,9 +101,9 @@ class MazeEnvironment(Environment):
91
  }
92
 
93
  # --- Reward settings ---
94
- reward_exit = 10.0 # reward for reaching the exit cell
95
- reward_move = 0.05 # reward for a move that didn't find the exit but is valid
96
- penalty_visited = -0.25 # penalty for revisiting a cell
97
  penalty_impossible = -0.75 # penalty for invalid move (wall/outside)
98
 
99
  dr, dc = move_map.get(action.action, (0, 0))
@@ -153,10 +163,9 @@ class MazeEnvironment(Environment):
153
  position=pos_list,
154
  total_reward=self.total_reward,
155
  legal_actions=legal_actions,
156
- done=done
157
  )
158
 
159
-
160
  def state(self) -> Optional[MazeState]:
161
  """Return the current MazeState object."""
162
  return self.state
@@ -186,4 +195,4 @@ class MazeEnvironment(Environment):
186
  if col < ncols - 1 and self.env.maze[row, col + 1] == 0:
187
  actions.append(3)
188
 
189
- return actions
 
42
  self,
43
  maze_array: np.ndarray,
44
  start_cell: Tuple[int, int] = (0, 0),
45
+ exit_cell: Optional[Tuple[int, int]] = (7, 7),
46
  ):
47
  # Create underlying Maze instance (matches your working code)
48
  self.env = Maze(maze=maze_array, start_cell=start_cell, exit_cell=exit_cell)
 
54
 
55
  def reset(self) -> MazeObservation:
56
  """Reset environment and return initial observation (MazeObservation)."""
57
+ observation = (
58
+ self.env.reset()
59
+ ) # typically returns np.array([row, col]) or similar
60
  # initialize episode state
61
  self.state = MazeState(episode_id="episode_1", step_count=0, done=False)
62
 
63
  # build MazeObservation; convert numpy to list for JSON-serializable dataclass fields
64
+ pos_list = (
65
+ observation.tolist()
66
+ if hasattr(observation, "tolist")
67
+ else list(observation)
68
+ )
69
  self.total_reward = 0
70
  legal_actions = self._compute_legal_actions(pos_list[0])
71
 
72
+ return MazeObservation(
73
+ position=pos_list,
74
+ total_reward=self.total_reward,
75
+ legal_actions=legal_actions,
76
+ )
77
 
78
  def step(self, action: MazeAction) -> MazeObservation:
79
  """
 
101
  }
102
 
103
  # --- Reward settings ---
104
+ reward_exit = 10.0 # reward for reaching the exit cell
105
+ reward_move = 0.05 # reward for a move that didn't find the exit but is valid
106
+ penalty_visited = -0.25 # penalty for revisiting a cell
107
  penalty_impossible = -0.75 # penalty for invalid move (wall/outside)
108
 
109
  dr, dc = move_map.get(action.action, (0, 0))
 
163
  position=pos_list,
164
  total_reward=self.total_reward,
165
  legal_actions=legal_actions,
166
+ done=done,
167
  )
168
 
 
169
  def state(self) -> Optional[MazeState]:
170
  """Return the current MazeState object."""
171
  return self.state
 
195
  if col < ncols - 1 and self.env.maze[row, col + 1] == 0:
196
  actions.append(3)
197
 
198
+ return actions
src/envs/maze_env/server/mazearray.py CHANGED
@@ -1,13 +1,15 @@
1
  import numpy as np
2
 
3
  # Maze
4
- maze = np.array([
5
- [0, 1, 0, 0, 0, 0, 0, 0],
6
- [0, 1, 0, 1, 0, 1, 0, 0],
7
- [0, 0, 0, 1, 1, 0, 1, 0],
8
- [0, 1, 0, 1, 0, 0, 0, 0],
9
- [1, 0, 0, 1, 0, 1, 0, 0],
10
- [0, 0, 0, 1, 0, 1, 1, 1],
11
- [0, 1, 1, 0, 0, 0, 0, 0],
12
- [0, 0, 0, 0, 0, 1, 0, 0]
13
- ])
 
 
 
1
  import numpy as np
2
 
3
  # Maze
4
+ maze = np.array(
5
+ [
6
+ [0, 1, 0, 0, 0, 0, 0, 0],
7
+ [0, 1, 0, 1, 0, 1, 0, 0],
8
+ [0, 0, 0, 1, 1, 0, 1, 0],
9
+ [0, 1, 0, 1, 0, 0, 0, 0],
10
+ [1, 0, 0, 1, 0, 1, 0, 0],
11
+ [0, 0, 0, 1, 0, 1, 1, 1],
12
+ [0, 1, 1, 0, 0, 0, 0, 0],
13
+ [0, 0, 0, 0, 0, 1, 0, 0],
14
+ ]
15
+ )