python-gym中box空间环境解读 Oct 23, 2023 · python · 分享到: python-gym中box空间环境解读 Box空间环境中使用的辅助函数: is_float_integer()判断是否为整数或浮点数 _broadcast()将标量扩充为给定的shape大小与dtype类型,附带处理np.inf的转换问题。 get_precision()获取数据类型精度,小知识:python中整型精度是无限。 _short_repr()如何将对象转换为一种简单的描述,一般用于print()或者文本化。 1# 查看变量是否为整数或浮点数 2def is_float_integer(var: Any) -> bool: 3 """Checks if a variable is an integer or float.""" 4 return np.issubdtype(type(var), np.integer) or np.issubdtype(type(var), np.floating) 5 6# 变量广播,实际上这个函数只对标量变量进行了广播,ndarray不变 7def _broadcast( 8 value: Union[SupportsFloat , NDArray[Any]], 9 dtype: np.dtype, 10 shape: tuple[int, ...], 11) -> NDArray[Any]: 12 """Handle infinite bounds and broadcast at the same time if needed. 13 包括对无穷大边界的整数化处理(实际就是±2) 14 This is needed primarily because: 15 >>> import numpy as np 16 >>> np.full((2,), np.inf, dtype=np.int32) 17 array([-2147483648, -2147483648], dtype=int32) 18 """ 19 # 对于value值只是一个数字,直接扩展到shape大小 20 if is_float_integer(value): 21 # 判断value是否为负无穷大,以及类型是否为signed integer(缩写i)。对于float类型,-np.inf不用修改 22 if np.isneginf(value) and np.dtype(dtype).kind == 'i': 23 # np.iinfo表示机器对整数类型的限制(根据机器不同会有所区别),这里根据实际类型将-np.inf值转换为对应dtype类型的下限。 24 # 至于为什么是+2 我也不知道 25 value = np.iinfo(dtype).min + 2 26 # 判断value是否为正无穷大,以及类型是否为signed integer(缩写i),对于float类型,np.inf不用修改 27 elif np.isposinf(value) and np.dtype(dtype).kind == 'i': 28 # 这里根据实际类型将np.inf值转换为对应dtype类型的上限 29 # 至于为什么是-2 我也不知道 30 value = np.iinfo(dtype).max - 2 31 # 使用np.full 填充常常数 32 return np.full(shape, value, dtype=dtype) 33 elif isinstance(value, np.ndarray): 34 # 对于np.array类型,不需要再次填充,但是需要根据数据类型对无穷(inf)进行对应转换,因为astype对np.inf向整数类型的转换存在问题 35 # this is needed because we can't stuff np.iinfo(int).min into an array of dtype float 36 casted_value = value.astype(dtype) 37 # 对于整数类型无穷的转换处理(float类型与非inf值不用额外处理) 38 if np.dtype(dtype).kind == 'i': 39 # 只处理inf值 40 casted_value[np.isneginf(value)] = np.iinfo(dtype).min + 2 41 casted_value[np.isposinf(value)] = np.iinfo(dtype).max - 2 42 return casted_value 43 else: 44 # only np.ndarray allowed beyond this point 45 raise TypeError( 46 f"Unknown dtype for `value`, expected `np.ndarray` or float/integer, got {type(value)}" 47 ) 48 49def get_precision(dtype: np.dtype) -> SupportsFloat: 50 """Get precision of a data type.""" 51 # 获取数据类型的精度,只针对float类型,其他类型精度无限? 52 # 在Python 中整型的取值范围是无限的(整数具有无限的精度) 53 if np.issubdtype(dtype, np.floating): 54 return np.finfo(dtype).precision 55 else: 56 return np.inf 57 58def _short_repr(arr:NDArray[Any])-> str: 59 # 主要用来打印的时期,以何种方式简要地说明arr参数 60 """Create a shortened string representation of a numpy array. 61 62 If arr is a multiple of the all-ones vector, return a string representation of the multiplier. 63 Otherwise, return a string representation of the entire array. 64 65 Args: 66 arr: The array to represent 67 68 Returns: 69 A short representation of the array 70 """ 71 # 首先处理特殊情况,一般情况就是使用arr自身的__repr__功能 72 if arr.size != 0 and np.min(arr) == np.max(arr): 73 return str(np.min(arr)) 74 # 主要还是利用array自身的__repr__功能 75 return str(arr) 具体实现自己的MyBox模块: 1from typing import Any, Iterable, Mapping 2from numpy.typing import NDArray # NDArray Can be used during runtime for typing arrays with a given dtype and unspecified shape. 3from typing import Sequence, SupportsFloat, Generator, Optional, Union, Any 4import numpy as np 5import numpy.typing as npt 6 7class MyBox(gym.spaces.Space[NDArray[any]]): 8 def __init__(self, 9 low: Union[SupportsFloat, NDArray[Any]], 10 high: Union[SupportsFloat, NDArray[Any]], 11 shape: Optional[Sequence[int]] = None, 12 dtype: Union[type[np.floating[Any]], type[np.integer[Any]]] = np.float32, 13 seed: Optional[Union[int, Generator]] = None): 14 # super().__init__(shape, dtype, seed) 15 assert (dtype is not None), "Box dtype must be explicitly provided, cannot be None." 16 self.dtype = np.dtype(dtype) 17 18 # determine shape if it isn't provided directly 19 if shape is not None: 20 assert all(np.issubdtype(type(dim), np.integer) for dim in shape), f"Expected all shape elements to be an integer, actual type: {tuple(type(dim) for dim in shape)}" 21 elif isinstance(low, np.ndarray): 22 shape = low.shape 23 elif isinstance(high, np.ndarray): 24 shape = high.shape 25 elif is_float_integer(low) and is_float_integer(high): 26 shape = (1, ) 27 else: 28 raise ValueError(f"Box shape is inferred from low and high, expected their types to be np.ndarray, an integer or a float, actual type low: {type(low)}, high: {type(high)}") 29 30 # Capture the boundedness information before replacing np.inf with get_inf 31 # 对于单个low数字,使用shape大小进行填充,对于其他如ndarray则保留原有形式 32 _low = np.full(shape, low, dtype=float) if is_float_integer(low) else low 33 34 # 记录是否有界,如果_low中元素都大于-np.inf就认为是有下界的 35 # 注意low的类型必须是int,float或np.array否则无法重载比较运算符 36 self.bounded_below: NDArray[np.bool_] = -np.inf < _low 37 # 记录是否有界,如果_high中元素都小于np.inf就认为是有下界的 38 # 注意high的类型必须是int,float或np.array否则无法重载比较运算符 39 _high = np.full(shape, high, dtype=float) if is_float_integer(high) else high 40 self.bounded_above: NDArray[np.bool_] = np.inf > _high 41 # 注:是否有界会影响抽样函数sample函数使用的分布函数。 42 43 # 如果low,high是单个数字,将他们扩展成一个ndarray 44 low = _broadcast(low, self.dtype, shape) 45 high = _broadcast(high, self.dtype, shape) 46 47 # 确保low经过前面操作,已经是np.ndarray类型 48 assert isinstance(low, np.ndarray) 49 # 确保low的shape与参数提供的shape一致 50 assert (low.shape == shape), f"low.shape doesn't match provided shape, low.shape: {low.shape}, shape: {shape}" 51 52 # 确保high经过前面操作,已经是np.ndarray类型 53 assert isinstance(high, np.ndarray) 54 # 确保high的shape与参数提供的shape一致 55 assert (high.shape == shape), f"high.shape doesn't match provided shape, high.shape: {high.shape}, shape: {shape}" 56 57 # 检验low与high的大小关系是否正确 58 if np.any(low > high):# low中有任一元素大于high 59 raise ValueError(f"Some low values are greater than high, low={low}, high={high}") 60 # 如果low的值是正无穷或者high的值是负无穷,肯定是错的 61 if np.any(np.isposinf(low)): 62 raise ValueError(f"No low value can be equal to `np.inf`, low={low}") 63 if np.any(np.isneginf(high)): 64 raise ValueError(f"No high value can be equal to `-np.inf`, high={high}") 65 66 # 确定shape变量没问题,赋值 67 self._shape:tuple[int,...] = shape 68 69 # 确定low,high没问题,按照dtype转换成相应类型赋值 70 # 我的疑问:在_broadcast()函数中已经做过了一样的操作,完成了转换,这里有必要再用一次astype()吗? 71 self.low = low.astype(self.dtype) 72 self.high = high.astype(self.dtype) 73 74 # 生成简洁的文本化描述 75 self.low_repr = _short_repr(self.low) 76 self.high_repr = _short_repr(self.high) 77 78 # 获取数据精度,暂时不知道用处 79 low_precision = get_precision(low.dtype) 80 high_precision = get_precision(high.dtype) 81 dtype_precision = get_precision(self.dtype) 82 if min(low_precision, high_precision) > dtype_precision: 83 gym.logger.warn(f"Box bound precision lowered by casting to {self.dtype}") 84 85 # 最后调用父函数 86 super().__init__(self.shape, self.dtype, seed) 87 88 89 # 设置一些只读属性 90 @property 91 def shape(self) -> tuple[int, ...]: 92 # 比父类gym.Space更加严格,永远不要是None 93 return self._shape 94 95 @property 96 def is_np_flattenable(self): 97 """Checks whether this space can be flattened to a :class:`spaces.Box`.""" 98 return True 99 100 def is_bounded(self, manner:str = "both") -> bool: 101 """ 102 检验box是否有界(某种程度上) 103 参数: Args : manner(str): ``"both"``, ``"below"``, ``"above"`` 104 结果: 实际边界与给定的manner是否一致 105 Raises: ValueError: 如果 `manner` 不是 ``"both"`` 或 ``"below"`` 或 ``"above"`` 106 """ 107 # 之前在__init__()环境中,检验的是否有上下界 108 below = bool(np.all(self.bounded_below)) 109 above = bool(np.all(self.bounded_above)) 110 if manner == "both": 111 return below and above 112 elif manner == "below": 113 return below 114 elif manner == "above": 115 return above 116 else: # 对于既不是both, below, above的场景,就报错 117 raise ValueError(f"manner is not in {{'below', 'above', 'both'}}, actual value: {manner}") 118 119 def sample(self, mask:None = None) -> NDArray[Any]: 120 r"""从Box的空间随机生成一个样本 121 从box空间的每一个维度独立地抽样,抽样原则服从如下方式: 122 123 * :math:`[a, b]` : 均匀分布 124 * :math:`[a, \infty)` : 偏移的指数分布 125 * :math:`(-\infty, b]` : 偏倚的负指数分布 126 * :math:`(-\infty, \infty)`: 正态分布 127 128 Args: mask:暂时不支持…… 129 130 Returns: 一个从Box空间随机抽样的值 131 """ 132 if mask is not None: # 暂时不支持mask,提供的话报错 133 raise gym.error.Error( 134 f"Box.sample cannot be provided a mask, actual value: {mask}" 135 ) 136 # 抽样空间上限处理。对于整数处理时,初始化函数是-2,这里是+1,不知道为什么 137 high = self.high if self.dtype.kind == "f" else self.high.astype("int64") + 1 138 139 # 抽样结果初始化 140 sample = np.empty(self.shape) 141 142 # 查看每一个维度是否有界,注意self.bounded_below与self.bounded_above都是bool序列,~表示按位取反,&表示按位与操作 143 # 下面四个变量都是True,False的np.array数组,用于索引取舍 144 # 无界的维度,得到一个array[bool] 145 unbounded = ~self.bounded_below & ~self.bounded_above 146 # 仅有上界的维度,得到一个array[bool] 147 upp_bounded = ~self.bounded_below & self.bounded_above 148 # 仅有下界的维度,得到一个array[bool] 149 low_bounded = self.bounded_below & ~self.bounded_above 150 # 有界的维度,得到一个array[bool] 151 bounded = self.bounded_below & self.bounded_above 152 153 # 针对四种边界情形,分别根据分布类型抽样。属性self.np_random继承自父类class Space(Generic[T_cov]) 154 # 无界的维度,正态分布抽样。通过unbounded的bool数组筛选项,再赋值 155 sample[unbounded] = self.np_random.normal(size=unbounded[unbounded].shape) # unbounded[unbounded]得到其中为True的项 156 157 # 有下界,使用指数分布定义域[0,+infty),加上下界作为偏移量,范围为[self.low[low_bounded], +infty) 158 sample[low_bounded] = (self.np_random.exponential(size=low_bounded[low_bounded].shape) + self.low[low_bounded]) 159 160 # 有上界,使用负指数分布,定义域(-infty,0],加上上界作为偏移量,范围为(-infty, high[upp_bounded]] 161 sample[upp_bounded] = (-self.np_random.exponential(size=upp_bounded[upp_bounded].shape)+ high[upp_bounded]) # 注意小区别,这里用的是high而不是self.high,因为上限的一点处理,具体不知道为什么 162 163 # 上下界皆存在,均匀分布抽样 164 sample[bounded] = self.np_random.uniform(low=self.low[bounded], high=high[bounded], size=bounded[bounded].shape) 165 166 # 对于整形,无符号整形等,向下取整 167 if self.dtype.kind in ["i", "u", "b"]: 168 sample = np.floor(sample) 169 170 return sample.astype(self.dtype) # 按要求类型返回 171 172 def contains(self, x:Any) -> bool: 173 """Return boolean specifying if x is a valid member of this space.""" 174 if not isinstance(x, np.ndarray): # 如果x不是np.ndarray类型,先尝试转换成np.ndarray 175 gym.logger.warn("Casting input x to numpy array.") 176 try: 177 x = np.asarray(x, dtype=self.dtype) 178 except(ValueError, TypeError): 179 return False 180 181 # 转换成功或者x是np.ndarray时 182 return bool(np.can_cast(x.dtype, self.dtype)# 类型可以转换 183 and x.shape == self.shape # shape大小一致 184 and np.all(x >= self.low) # 处于上界与下界之间 185 and np.all(x <= self.high)) 186 187 # json序列化,暂时不用考虑 188 def to_jsonable(self, sample_n: Sequence[NDArray[Any]]) -> list[list]: 189 """Convert a batch of samples from this space to a JSONable data type.""" 190 return [sample.tolist() for sample in sample_n] 191 192 # json反序列化,暂时不用考虑 193 def from_jsonable(self, sample_n: Sequence[Union[float, int]]) -> list[NDArray[Any]]: 194 """Convert a JSONable data type to a batch of samples from this space.""" 195 return [np.asarray(sample, dtype=self.dtype) for sample in sample_n] 196 197 # 打印时如何生成string字符串 198 def __repr__(self) -> str: 199 """A string representation of this space. 200 201 The representation will include bounds, shape and dtype. 202 If a bound is uniform, only the corresponding scalar will be given to avoid redundant and ugly strings. 203 204 Returns: 205 A representation of the space 206 """ 207 return f"Box({self.low_repr}, {self.high_repr}, {self.shape}, {self.dtype})" 208 209 # 对于等号运算符的重载 210 def __eq__(self, other:Any) -> bool: 211 """Check whether `other` is equivalent to this instance. Doesn't check dtype equivalence.""" 212 # 两个Box空间是否相等,看下面四条 213 return ( 214 isinstance(other, MyBox) # 类的类型是否相容 215 and (self.shape == other.shape) # shape大小是否一致 216 and np.allclose(self.low, other.low) # 用于比较浮点数,两者相差是否在一定范围内 217 and np.allclose(self.high, other.high) # 上下界是否一样 218 ) 219 # 用于python的反序列化,暂时不用考虑 220 def __setstate__(self, state: Union[Iterable[tuple[str, Any]], Mapping[str, Any]]): 221 """Sets the state of the box for unpickling a box with legacy support.""" 222 super().__setstate__(state) 223 224 # legacy support through re-adding "low_repr" and "high_repr" if missing from pickled state 225 if not hasattr(self, "low_repr"): 226 self.low_repr = _short_repr(self.low) 227 228 if not hasattr(self, "high_repr"): 229 self.high_repr = _short_repr(self.high) 测试代码: 1testbox = MyBox( 2 np.array([[0,0,0,0], 3 [-np.inf,-np.inf,-np.inf,-np.inf], 4 [-np.inf,-np.inf,0,0]] 5 ), 6 np.array([[1,1,1,1], 7 [np.inf,np.inf, 1, 1], 8 [1, 1, np.inf, np.inf]] 9 ), 10 (3,4), 11 dtype=np.float64) 12print(testbox.shape, testbox.is_bounded("both"), testbox.is_np_flattenable) 13# (3, 4) False True 14 15print(testbox.bounded_above) 16print(testbox.bounded_below) 17print(testbox.sample()) 18print(testbox.contains(([[0.1,0.1,0.1,0.1],[0,0,0,0],[0,0,0.1,0.1]]))) 19 20''' 21[[ True True True True] 22 [False False True True] 23 [ True True False False]] 24[[ True True True True] 25 [False False False False] 26 [False False True True]] 27[[ 0.02490971 0.73054417 0.12991174 0.0641791 ] 28 [-0.96922534 1.03660516 0.84641361 0.84423148] 29 [-2.12524701 0.19277976 1.75165337 0.16050653]] 30True 31/tmp/ipykernel_18223/1967140855.py:169: UserWarning: WARN: Casting input x to numpy array. 32 gym.logger.warn("Casting input x to numpy array.") 33'''