python-gym中box空间环境解读
python-gym中box空间环境解读
Box空间环境中使用的辅助函数:
is_float_integer()
判断是否为整数或浮点数_broadcast()
将标量扩充为给定的shape大小与dtype类型,附带处理np.inf
的转换问题。get_precision()
获取数据类型精度,小知识:python中整型精度是无限。_short_repr()
如何将对象转换为一种简单的描述,一般用于print()
或者文本化。
1# 查看变量是否为整数或浮点数
2def is_float_integer(var: Any) -> bool:
3 """Checks if a variable is an integer or float."""
4 return np.issubdtype(type(var), np.integer) or np.issubdtype(type(var), np.floating)
5
6# 变量广播,实际上这个函数只对标量变量进行了广播,ndarray不变
7def _broadcast(
8 value: Union[SupportsFloat , NDArray[Any]],
9 dtype: np.dtype,
10 shape: tuple[int, ...],
11) -> NDArray[Any]:
12 """Handle infinite bounds and broadcast at the same time if needed.
13 包括对无穷大边界的整数化处理(实际就是±2)
14 This is needed primarily because:
15 >>> import numpy as np
16 >>> np.full((2,), np.inf, dtype=np.int32)
17 array([-2147483648, -2147483648], dtype=int32)
18 """
19 # 对于value值只是一个数字,直接扩展到shape大小
20 if is_float_integer(value):
21 # 判断value是否为负无穷大,以及类型是否为signed integer(缩写i)。对于float类型,-np.inf不用修改
22 if np.isneginf(value) and np.dtype(dtype).kind == 'i':
23 # np.iinfo表示机器对整数类型的限制(根据机器不同会有所区别),这里根据实际类型将-np.inf值转换为对应dtype类型的下限。
24 # 至于为什么是+2 我也不知道
25 value = np.iinfo(dtype).min + 2
26 # 判断value是否为正无穷大,以及类型是否为signed integer(缩写i),对于float类型,np.inf不用修改
27 elif np.isposinf(value) and np.dtype(dtype).kind == 'i':
28 # 这里根据实际类型将np.inf值转换为对应dtype类型的上限
29 # 至于为什么是-2 我也不知道
30 value = np.iinfo(dtype).max - 2
31 # 使用np.full 填充常常数
32 return np.full(shape, value, dtype=dtype)
33 elif isinstance(value, np.ndarray):
34 # 对于np.array类型,不需要再次填充,但是需要根据数据类型对无穷(inf)进行对应转换,因为astype对np.inf向整数类型的转换存在问题
35 # this is needed because we can't stuff np.iinfo(int).min into an array of dtype float
36 casted_value = value.astype(dtype)
37 # 对于整数类型无穷的转换处理(float类型与非inf值不用额外处理)
38 if np.dtype(dtype).kind == 'i':
39 # 只处理inf值
40 casted_value[np.isneginf(value)] = np.iinfo(dtype).min + 2
41 casted_value[np.isposinf(value)] = np.iinfo(dtype).max - 2
42 return casted_value
43 else:
44 # only np.ndarray allowed beyond this point
45 raise TypeError(
46 f"Unknown dtype for `value`, expected `np.ndarray` or float/integer, got {type(value)}"
47 )
48
49def get_precision(dtype: np.dtype) -> SupportsFloat:
50 """Get precision of a data type."""
51 # 获取数据类型的精度,只针对float类型,其他类型精度无限?
52 # 在Python 中整型的取值范围是无限的(整数具有无限的精度)
53 if np.issubdtype(dtype, np.floating):
54 return np.finfo(dtype).precision
55 else:
56 return np.inf
57
58def _short_repr(arr:NDArray[Any])-> str:
59 # 主要用来打印的时期,以何种方式简要地说明arr参数
60 """Create a shortened string representation of a numpy array.
61
62 If arr is a multiple of the all-ones vector, return a string representation of the multiplier.
63 Otherwise, return a string representation of the entire array.
64
65 Args:
66 arr: The array to represent
67
68 Returns:
69 A short representation of the array
70 """
71 # 首先处理特殊情况,一般情况就是使用arr自身的__repr__功能
72 if arr.size != 0 and np.min(arr) == np.max(arr):
73 return str(np.min(arr))
74 # 主要还是利用array自身的__repr__功能
75 return str(arr)
python
具体实现自己的MyBox模块:
1from typing import Any, Iterable, Mapping
2from numpy.typing import NDArray # NDArray Can be used during runtime for typing arrays with a given dtype and unspecified shape.
3from typing import Sequence, SupportsFloat, Generator, Optional, Union, Any
4import numpy as np
5import numpy.typing as npt
6
7class MyBox(gym.spaces.Space[NDArray[any]]):
8 def __init__(self,
9 low: Union[SupportsFloat, NDArray[Any]],
10 high: Union[SupportsFloat, NDArray[Any]],
11 shape: Optional[Sequence[int]] = None,
12 dtype: Union[type[np.floating[Any]], type[np.integer[Any]]] = np.float32,
13 seed: Optional[Union[int, Generator]] = None):
14 # super().__init__(shape, dtype, seed)
15 assert (dtype is not None), "Box dtype must be explicitly provided, cannot be None."
16 self.dtype = np.dtype(dtype)
17
18 # determine shape if it isn't provided directly
19 if shape is not None:
20 assert all(np.issubdtype(type(dim), np.integer) for dim in shape), f"Expected all shape elements to be an integer, actual type: {tuple(type(dim) for dim in shape)}"
21 elif isinstance(low, np.ndarray):
22 shape = low.shape
23 elif isinstance(high, np.ndarray):
24 shape = high.shape
25 elif is_float_integer(low) and is_float_integer(high):
26 shape = (1, )
27 else:
28 raise ValueError(f"Box shape is inferred from low and high, expected their types to be np.ndarray, an integer or a float, actual type low: {type(low)}, high: {type(high)}")
29
30 # Capture the boundedness information before replacing np.inf with get_inf
31 # 对于单个low数字,使用shape大小进行填充,对于其他如ndarray则保留原有形式
32 _low = np.full(shape, low, dtype=float) if is_float_integer(low) else low
33
34 # 记录是否有界,如果_low中元素都大于-np.inf就认为是有下界的
35 # 注意low的类型必须是int,float或np.array否则无法重载比较运算符
36 self.bounded_below: NDArray[np.bool_] = -np.inf < _low
37 # 记录是否有界,如果_high中元素都小于np.inf就认为是有下界的
38 # 注意high的类型必须是int,float或np.array否则无法重载比较运算符
39 _high = np.full(shape, high, dtype=float) if is_float_integer(high) else high
40 self.bounded_above: NDArray[np.bool_] = np.inf > _high
41 # 注:是否有界会影响抽样函数sample函数使用的分布函数。
42
43 # 如果low,high是单个数字,将他们扩展成一个ndarray
44 low = _broadcast(low, self.dtype, shape)
45 high = _broadcast(high, self.dtype, shape)
46
47 # 确保low经过前面操作,已经是np.ndarray类型
48 assert isinstance(low, np.ndarray)
49 # 确保low的shape与参数提供的shape一致
50 assert (low.shape == shape), f"low.shape doesn't match provided shape, low.shape: {low.shape}, shape: {shape}"
51
52 # 确保high经过前面操作,已经是np.ndarray类型
53 assert isinstance(high, np.ndarray)
54 # 确保high的shape与参数提供的shape一致
55 assert (high.shape == shape), f"high.shape doesn't match provided shape, high.shape: {high.shape}, shape: {shape}"
56
57 # 检验low与high的大小关系是否正确
58 if np.any(low > high):# low中有任一元素大于high
59 raise ValueError(f"Some low values are greater than high, low={low}, high={high}")
60 # 如果low的值是正无穷或者high的值是负无穷,肯定是错的
61 if np.any(np.isposinf(low)):
62 raise ValueError(f"No low value can be equal to `np.inf`, low={low}")
63 if np.any(np.isneginf(high)):
64 raise ValueError(f"No high value can be equal to `-np.inf`, high={high}")
65
66 # 确定shape变量没问题,赋值
67 self._shape:tuple[int,...] = shape
68
69 # 确定low,high没问题,按照dtype转换成相应类型赋值
70 # 我的疑问:在_broadcast()函数中已经做过了一样的操作,完成了转换,这里有必要再用一次astype()吗?
71 self.low = low.astype(self.dtype)
72 self.high = high.astype(self.dtype)
73
74 # 生成简洁的文本化描述
75 self.low_repr = _short_repr(self.low)
76 self.high_repr = _short_repr(self.high)
77
78 # 获取数据精度,暂时不知道用处
79 low_precision = get_precision(low.dtype)
80 high_precision = get_precision(high.dtype)
81 dtype_precision = get_precision(self.dtype)
82 if min(low_precision, high_precision) > dtype_precision:
83 gym.logger.warn(f"Box bound precision lowered by casting to {self.dtype}")
84
85 # 最后调用父函数
86 super().__init__(self.shape, self.dtype, seed)
87
88
89 # 设置一些只读属性
90 @property
91 def shape(self) -> tuple[int, ...]:
92 # 比父类gym.Space更加严格,永远不要是None
93 return self._shape
94
95 @property
96 def is_np_flattenable(self):
97 """Checks whether this space can be flattened to a :class:`spaces.Box`."""
98 return True
99
100 def is_bounded(self, manner:str = "both") -> bool:
101 """
102 检验box是否有界(某种程度上)
103 参数: Args : manner(str): ``"both"``, ``"below"``, ``"above"``
104 结果: 实际边界与给定的manner是否一致
105 Raises: ValueError: 如果 `manner` 不是 ``"both"`` 或 ``"below"`` 或 ``"above"``
106 """
107 # 之前在__init__()环境中,检验的是否有上下界
108 below = bool(np.all(self.bounded_below))
109 above = bool(np.all(self.bounded_above))
110 if manner == "both":
111 return below and above
112 elif manner == "below":
113 return below
114 elif manner == "above":
115 return above
116 else: # 对于既不是both, below, above的场景,就报错
117 raise ValueError(f"manner is not in {{'below', 'above', 'both'}}, actual value: {manner}")
118
119 def sample(self, mask:None = None) -> NDArray[Any]:
120 r"""从Box的空间随机生成一个样本
121 从box空间的每一个维度独立地抽样,抽样原则服从如下方式:
122
123 * :math:`[a, b]` : 均匀分布
124 * :math:`[a, \infty)` : 偏移的指数分布
125 * :math:`(-\infty, b]` : 偏倚的负指数分布
126 * :math:`(-\infty, \infty)`: 正态分布
127
128 Args: mask:暂时不支持……
129
130 Returns: 一个从Box空间随机抽样的值
131 """
132 if mask is not None: # 暂时不支持mask,提供的话报错
133 raise gym.error.Error(
134 f"Box.sample cannot be provided a mask, actual value: {mask}"
135 )
136 # 抽样空间上限处理。对于整数处理时,初始化函数是-2,这里是+1,不知道为什么
137 high = self.high if self.dtype.kind == "f" else self.high.astype("int64") + 1
138
139 # 抽样结果初始化
140 sample = np.empty(self.shape)
141
142 # 查看每一个维度是否有界,注意self.bounded_below与self.bounded_above都是bool序列,~表示按位取反,&表示按位与操作
143 # 下面四个变量都是True,False的np.array数组,用于索引取舍
144 # 无界的维度,得到一个array[bool]
145 unbounded = ~self.bounded_below & ~self.bounded_above
146 # 仅有上界的维度,得到一个array[bool]
147 upp_bounded = ~self.bounded_below & self.bounded_above
148 # 仅有下界的维度,得到一个array[bool]
149 low_bounded = self.bounded_below & ~self.bounded_above
150 # 有界的维度,得到一个array[bool]
151 bounded = self.bounded_below & self.bounded_above
152
153 # 针对四种边界情形,分别根据分布类型抽样。属性self.np_random继承自父类class Space(Generic[T_cov])
154 # 无界的维度,正态分布抽样。通过unbounded的bool数组筛选项,再赋值
155 sample[unbounded] = self.np_random.normal(size=unbounded[unbounded].shape) # unbounded[unbounded]得到其中为True的项
156
157 # 有下界,使用指数分布定义域[0,+infty),加上下界作为偏移量,范围为[self.low[low_bounded], +infty)
158 sample[low_bounded] = (self.np_random.exponential(size=low_bounded[low_bounded].shape) + self.low[low_bounded])
159
160 # 有上界,使用负指数分布,定义域(-infty,0],加上上界作为偏移量,范围为(-infty, high[upp_bounded]]
161 sample[upp_bounded] = (-self.np_random.exponential(size=upp_bounded[upp_bounded].shape)+ high[upp_bounded]) # 注意小区别,这里用的是high而不是self.high,因为上限的一点处理,具体不知道为什么
162
163 # 上下界皆存在,均匀分布抽样
164 sample[bounded] = self.np_random.uniform(low=self.low[bounded], high=high[bounded], size=bounded[bounded].shape)
165
166 # 对于整形,无符号整形等,向下取整
167 if self.dtype.kind in ["i", "u", "b"]:
168 sample = np.floor(sample)
169
170 return sample.astype(self.dtype) # 按要求类型返回
171
172 def contains(self, x:Any) -> bool:
173 """Return boolean specifying if x is a valid member of this space."""
174 if not isinstance(x, np.ndarray): # 如果x不是np.ndarray类型,先尝试转换成np.ndarray
175 gym.logger.warn("Casting input x to numpy array.")
176 try:
177 x = np.asarray(x, dtype=self.dtype)
178 except(ValueError, TypeError):
179 return False
180
181 # 转换成功或者x是np.ndarray时
182 return bool(np.can_cast(x.dtype, self.dtype)# 类型可以转换
183 and x.shape == self.shape # shape大小一致
184 and np.all(x >= self.low) # 处于上界与下界之间
185 and np.all(x <= self.high))
186
187 # json序列化,暂时不用考虑
188 def to_jsonable(self, sample_n: Sequence[NDArray[Any]]) -> list[list]:
189 """Convert a batch of samples from this space to a JSONable data type."""
190 return [sample.tolist() for sample in sample_n]
191
192 # json反序列化,暂时不用考虑
193 def from_jsonable(self, sample_n: Sequence[Union[float, int]]) -> list[NDArray[Any]]:
194 """Convert a JSONable data type to a batch of samples from this space."""
195 return [np.asarray(sample, dtype=self.dtype) for sample in sample_n]
196
197 # 打印时如何生成string字符串
198 def __repr__(self) -> str:
199 """A string representation of this space.
200
201 The representation will include bounds, shape and dtype.
202 If a bound is uniform, only the corresponding scalar will be given to avoid redundant and ugly strings.
203
204 Returns:
205 A representation of the space
206 """
207 return f"Box({self.low_repr}, {self.high_repr}, {self.shape}, {self.dtype})"
208
209 # 对于等号运算符的重载
210 def __eq__(self, other:Any) -> bool:
211 """Check whether `other` is equivalent to this instance. Doesn't check dtype equivalence."""
212 # 两个Box空间是否相等,看下面四条
213 return (
214 isinstance(other, MyBox) # 类的类型是否相容
215 and (self.shape == other.shape) # shape大小是否一致
216 and np.allclose(self.low, other.low) # 用于比较浮点数,两者相差是否在一定范围内
217 and np.allclose(self.high, other.high) # 上下界是否一样
218 )
219 # 用于python的反序列化,暂时不用考虑
220 def __setstate__(self, state: Union[Iterable[tuple[str, Any]], Mapping[str, Any]]):
221 """Sets the state of the box for unpickling a box with legacy support."""
222 super().__setstate__(state)
223
224 # legacy support through re-adding "low_repr" and "high_repr" if missing from pickled state
225 if not hasattr(self, "low_repr"):
226 self.low_repr = _short_repr(self.low)
227
228 if not hasattr(self, "high_repr"):
229 self.high_repr = _short_repr(self.high)
python
测试代码:
1testbox = MyBox(
2 np.array([[0,0,0,0],
3 [-np.inf,-np.inf,-np.inf,-np.inf],
4 [-np.inf,-np.inf,0,0]]
5 ),
6 np.array([[1,1,1,1],
7 [np.inf,np.inf, 1, 1],
8 [1, 1, np.inf, np.inf]]
9 ),
10 (3,4),
11 dtype=np.float64)
12print(testbox.shape, testbox.is_bounded("both"), testbox.is_np_flattenable)
13# (3, 4) False True
14
15print(testbox.bounded_above)
16print(testbox.bounded_below)
17print(testbox.sample())
18print(testbox.contains(([[0.1,0.1,0.1,0.1],[0,0,0,0],[0,0,0.1,0.1]])))
19
20'''
21[[ True True True True]
22 [False False True True]
23 [ True True False False]]
24[[ True True True True]
25 [False False False False]
26 [False False True True]]
27[[ 0.02490971 0.73054417 0.12991174 0.0641791 ]
28 [-0.96922534 1.03660516 0.84641361 0.84423148]
29 [-2.12524701 0.19277976 1.75165337 0.16050653]]
30True
31/tmp/ipykernel_18223/1967140855.py:169: UserWarning: WARN: Casting input x to numpy array.
32 gym.logger.warn("Casting input x to numpy array.")
33'''
python