python-gym中box空间环境解读

python-gym中box空间环境解读

Box空间环境中使用的辅助函数:

  • is_float_integer()判断是否为整数或浮点数
  • _broadcast()将标量扩充为给定的shape大小与dtype类型,附带处理np.inf的转换问题。
  • get_precision()获取数据类型精度,小知识:python中整型精度是无限。
  • _short_repr()如何将对象转换为一种简单的描述,一般用于print()或者文本化。
 1# 查看变量是否为整数或浮点数
 2def is_float_integer(var: Any) -> bool:
 3    """Checks if a variable is an integer or float."""
 4    return np.issubdtype(type(var), np.integer) or np.issubdtype(type(var), np.floating)
 5
 6# 变量广播,实际上这个函数只对标量变量进行了广播,ndarray不变
 7def _broadcast(
 8        value: Union[SupportsFloat , NDArray[Any]],
 9        dtype: np.dtype,
10        shape: tuple[int, ...],
11) -> NDArray[Any]:
12    """Handle infinite bounds and broadcast at the same time if needed.
13       包括对无穷大边界的整数化处理(实际就是±2)
14    This is needed primarily because:
15        >>> import numpy as np
16        >>> np.full((2,), np.inf, dtype=np.int32)
17        array([-2147483648, -2147483648], dtype=int32)
18    """
19    # 对于value值只是一个数字,直接扩展到shape大小
20    if is_float_integer(value):
21        # 判断value是否为负无穷大,以及类型是否为signed integer(缩写i)。对于float类型,-np.inf不用修改
22        if np.isneginf(value) and np.dtype(dtype).kind == 'i':
23            # np.iinfo表示机器对整数类型的限制(根据机器不同会有所区别),这里根据实际类型将-np.inf值转换为对应dtype类型的下限。
24            # 至于为什么是+2 我也不知道
25            value  = np.iinfo(dtype).min + 2
26        # 判断value是否为正无穷大,以及类型是否为signed integer(缩写i),对于float类型,np.inf不用修改
27        elif np.isposinf(value) and np.dtype(dtype).kind == 'i':
28            # 这里根据实际类型将np.inf值转换为对应dtype类型的上限
29            # 至于为什么是-2 我也不知道
30            value = np.iinfo(dtype).max - 2
31        # 使用np.full 填充常常数
32        return np.full(shape, value, dtype=dtype)
33    elif isinstance(value, np.ndarray):
34        # 对于np.array类型,不需要再次填充,但是需要根据数据类型对无穷(inf)进行对应转换,因为astype对np.inf向整数类型的转换存在问题
35        # this is needed because we can't stuff np.iinfo(int).min into an array of dtype float
36        casted_value = value.astype(dtype)
37        # 对于整数类型无穷的转换处理(float类型与非inf值不用额外处理)
38        if np.dtype(dtype).kind == 'i':
39            # 只处理inf值
40            casted_value[np.isneginf(value)] = np.iinfo(dtype).min + 2
41            casted_value[np.isposinf(value)] = np.iinfo(dtype).max - 2
42        return casted_value
43    else:
44        # only np.ndarray allowed beyond this point
45        raise TypeError(
46            f"Unknown dtype for `value`, expected `np.ndarray` or float/integer, got {type(value)}"
47        )
48    
49def get_precision(dtype: np.dtype) -> SupportsFloat:
50    """Get precision of a data type."""
51    # 获取数据类型的精度,只针对float类型,其他类型精度无限?
52    # 在Python 中整型的取值范围是无限的(整数具有无限的精度)
53    if np.issubdtype(dtype, np.floating):
54        return np.finfo(dtype).precision
55    else:
56        return np.inf
57
58def _short_repr(arr:NDArray[Any])-> str:
59    # 主要用来打印的时期,以何种方式简要地说明arr参数
60    """Create a shortened string representation of a numpy array.
61
62    If arr is a multiple of the all-ones vector, return a string representation of the multiplier.
63    Otherwise, return a string representation of the entire array.
64
65    Args:
66        arr: The array to represent
67
68    Returns:
69        A short representation of the array
70    """
71    # 首先处理特殊情况,一般情况就是使用arr自身的__repr__功能
72    if arr.size != 0 and np.min(arr) == np.max(arr):
73        return str(np.min(arr))
74    # 主要还是利用array自身的__repr__功能
75    return str(arr)

具体实现自己的MyBox模块:

  1from typing import Any, Iterable, Mapping
  2from numpy.typing import NDArray # NDArray Can be used during runtime for typing arrays with a given dtype and unspecified shape.
  3from typing import Sequence, SupportsFloat, Generator, Optional, Union, Any
  4import numpy as np
  5import numpy.typing as npt
  6
  7class MyBox(gym.spaces.Space[NDArray[any]]):
  8    def __init__(self, 
  9                 low: Union[SupportsFloat, NDArray[Any]],
 10                 high: Union[SupportsFloat, NDArray[Any]],
 11                 shape: Optional[Sequence[int]] = None, 
 12                 dtype: Union[type[np.floating[Any]], type[np.integer[Any]]] = np.float32, 
 13                 seed: Optional[Union[int, Generator]] = None):
 14        # super().__init__(shape, dtype, seed)
 15        assert (dtype is not None), "Box dtype must be explicitly provided, cannot be None."
 16        self.dtype = np.dtype(dtype)
 17
 18        # determine shape if it isn't provided directly
 19        if shape is not None:
 20            assert all(np.issubdtype(type(dim), np.integer) for dim in shape), f"Expected all shape elements to be an integer, actual type: {tuple(type(dim) for dim in shape)}"
 21        elif isinstance(low, np.ndarray):
 22            shape = low.shape
 23        elif isinstance(high, np.ndarray):
 24            shape = high.shape
 25        elif is_float_integer(low) and is_float_integer(high):
 26            shape = (1, )
 27        else:
 28            raise ValueError(f"Box shape is inferred from low and high, expected their types to be np.ndarray, an integer or a float, actual type low: {type(low)}, high: {type(high)}")
 29        
 30        # Capture the boundedness information before replacing np.inf with get_inf
 31        # 对于单个low数字,使用shape大小进行填充,对于其他如ndarray则保留原有形式
 32        _low = np.full(shape, low, dtype=float) if is_float_integer(low) else low
 33
 34        # 记录是否有界,如果_low中元素都大于-np.inf就认为是有下界的
 35        # 注意low的类型必须是int,float或np.array否则无法重载比较运算符
 36        self.bounded_below: NDArray[np.bool_] = -np.inf < _low
 37        # 记录是否有界,如果_high中元素都小于np.inf就认为是有下界的
 38        # 注意high的类型必须是int,float或np.array否则无法重载比较运算符
 39        _high = np.full(shape, high, dtype=float) if is_float_integer(high) else high
 40        self.bounded_above: NDArray[np.bool_] = np.inf > _high
 41        # 注:是否有界会影响抽样函数sample函数使用的分布函数。
 42
 43        # 如果low,high是单个数字,将他们扩展成一个ndarray
 44        low = _broadcast(low, self.dtype, shape)
 45        high = _broadcast(high, self.dtype, shape)
 46
 47        # 确保low经过前面操作,已经是np.ndarray类型
 48        assert isinstance(low, np.ndarray)
 49        # 确保low的shape与参数提供的shape一致
 50        assert (low.shape == shape), f"low.shape doesn't match provided shape, low.shape: {low.shape}, shape: {shape}"
 51        
 52        # 确保high经过前面操作,已经是np.ndarray类型
 53        assert isinstance(high, np.ndarray)
 54        # 确保high的shape与参数提供的shape一致
 55        assert (high.shape == shape), f"high.shape doesn't match provided shape, high.shape: {high.shape}, shape: {shape}"
 56
 57        # 检验low与high的大小关系是否正确
 58        if np.any(low > high):# low中有任一元素大于high
 59             raise ValueError(f"Some low values are greater than high, low={low}, high={high}")
 60        # 如果low的值是正无穷或者high的值是负无穷,肯定是错的
 61        if np.any(np.isposinf(low)):
 62            raise ValueError(f"No low value can be equal to `np.inf`, low={low}")
 63        if np.any(np.isneginf(high)):
 64            raise ValueError(f"No high value can be equal to `-np.inf`, high={high}")
 65        
 66        # 确定shape变量没问题,赋值
 67        self._shape:tuple[int,...] = shape
 68
 69        # 确定low,high没问题,按照dtype转换成相应类型赋值
 70        # 我的疑问:在_broadcast()函数中已经做过了一样的操作,完成了转换,这里有必要再用一次astype()吗?
 71        self.low = low.astype(self.dtype)
 72        self.high = high.astype(self.dtype)
 73
 74        # 生成简洁的文本化描述
 75        self.low_repr = _short_repr(self.low)
 76        self.high_repr = _short_repr(self.high)
 77
 78        # 获取数据精度,暂时不知道用处
 79        low_precision = get_precision(low.dtype)
 80        high_precision = get_precision(high.dtype)
 81        dtype_precision = get_precision(self.dtype)
 82        if min(low_precision, high_precision) > dtype_precision:
 83            gym.logger.warn(f"Box bound precision lowered by casting to {self.dtype}")
 84
 85        # 最后调用父函数
 86        super().__init__(self.shape, self.dtype, seed)
 87
 88
 89    # 设置一些只读属性
 90    @property
 91    def shape(self) -> tuple[int, ...]:
 92        # 比父类gym.Space更加严格,永远不要是None
 93        return self._shape
 94    
 95    @property
 96    def is_np_flattenable(self):
 97        """Checks whether this space can be flattened to a :class:`spaces.Box`."""
 98        return True
 99
100    def is_bounded(self, manner:str = "both") -> bool:
101        """
102        检验box是否有界(某种程度上)
103        参数: Args : manner(str): ``"both"``, ``"below"``, ``"above"``
104        结果: 实际边界与给定的manner是否一致
105        Raises: ValueError: 如果 `manner` 不是 ``"both"`` 或 ``"below"`` 或 ``"above"``
106        """
107        # 之前在__init__()环境中,检验的是否有上下界
108        below = bool(np.all(self.bounded_below))
109        above = bool(np.all(self.bounded_above))
110        if manner == "both":
111            return below and above
112        elif manner == "below":
113            return below
114        elif manner == "above":
115            return above
116        else: # 对于既不是both, below, above的场景,就报错
117            raise ValueError(f"manner is not in {{'below', 'above', 'both'}}, actual value: {manner}")
118        
119    def sample(self, mask:None = None) -> NDArray[Any]:
120        r"""从Box的空间随机生成一个样本
121        从box空间的每一个维度独立地抽样,抽样原则服从如下方式:
122        
123        * :math:`[a, b]` : 均匀分布
124        * :math:`[a, \infty)` : 偏移的指数分布
125        * :math:`(-\infty, b]` : 偏倚的负指数分布
126        * :math:`(-\infty, \infty)`: 正态分布
127
128        Args: mask:暂时不支持……
129
130        Returns: 一个从Box空间随机抽样的值
131        """
132        if mask is not None: # 暂时不支持mask,提供的话报错
133            raise gym.error.Error(
134                f"Box.sample cannot be provided a mask, actual value: {mask}"
135            )
136        # 抽样空间上限处理。对于整数处理时,初始化函数是-2,这里是+1,不知道为什么
137        high = self.high if self.dtype.kind == "f" else self.high.astype("int64") + 1
138
139        # 抽样结果初始化
140        sample = np.empty(self.shape)
141
142        # 查看每一个维度是否有界,注意self.bounded_below与self.bounded_above都是bool序列,~表示按位取反,&表示按位与操作
143        # 下面四个变量都是True,False的np.array数组,用于索引取舍
144        # 无界的维度,得到一个array[bool]
145        unbounded = ~self.bounded_below & ~self.bounded_above
146        # 仅有上界的维度,得到一个array[bool]
147        upp_bounded = ~self.bounded_below & self.bounded_above
148        # 仅有下界的维度,得到一个array[bool]
149        low_bounded = self.bounded_below & ~self.bounded_above
150        # 有界的维度,得到一个array[bool]
151        bounded = self.bounded_below & self.bounded_above
152
153        # 针对四种边界情形,分别根据分布类型抽样。属性self.np_random继承自父类class Space(Generic[T_cov])
154        # 无界的维度,正态分布抽样。通过unbounded的bool数组筛选项,再赋值
155        sample[unbounded] = self.np_random.normal(size=unbounded[unbounded].shape) # unbounded[unbounded]得到其中为True的项
156
157        # 有下界,使用指数分布定义域[0,+infty),加上下界作为偏移量,范围为[self.low[low_bounded], +infty)
158        sample[low_bounded] = (self.np_random.exponential(size=low_bounded[low_bounded].shape) + self.low[low_bounded])
159
160        # 有上界,使用负指数分布,定义域(-infty,0],加上上界作为偏移量,范围为(-infty, high[upp_bounded]]
161        sample[upp_bounded] = (-self.np_random.exponential(size=upp_bounded[upp_bounded].shape)+ high[upp_bounded]) # 注意小区别,这里用的是high而不是self.high,因为上限的一点处理,具体不知道为什么
162
163        # 上下界皆存在,均匀分布抽样
164        sample[bounded] = self.np_random.uniform(low=self.low[bounded], high=high[bounded], size=bounded[bounded].shape)
165        
166        # 对于整形,无符号整形等,向下取整
167        if self.dtype.kind in ["i", "u", "b"]:
168            sample = np.floor(sample)
169        
170        return sample.astype(self.dtype) # 按要求类型返回
171    
172    def contains(self, x:Any) -> bool:
173        """Return boolean specifying if x is a valid member of this space."""
174        if not isinstance(x, np.ndarray): # 如果x不是np.ndarray类型,先尝试转换成np.ndarray
175            gym.logger.warn("Casting input x to numpy array.")
176            try:
177                x = np.asarray(x, dtype=self.dtype)
178            except(ValueError, TypeError):
179                return False
180        
181        # 转换成功或者x是np.ndarray时
182        return bool(np.can_cast(x.dtype, self.dtype)# 类型可以转换
183                    and x.shape == self.shape # shape大小一致
184                    and np.all(x >= self.low) # 处于上界与下界之间
185                    and np.all(x <= self.high)) 
186    
187    # json序列化,暂时不用考虑
188    def to_jsonable(self, sample_n: Sequence[NDArray[Any]]) -> list[list]:
189        """Convert a batch of samples from this space to a JSONable data type."""
190        return [sample.tolist() for sample in sample_n]
191
192    # json反序列化,暂时不用考虑
193    def from_jsonable(self, sample_n: Sequence[Union[float, int]]) -> list[NDArray[Any]]:
194        """Convert a JSONable data type to a batch of samples from this space."""
195        return [np.asarray(sample, dtype=self.dtype) for sample in sample_n]
196
197    # 打印时如何生成string字符串
198    def __repr__(self) -> str:
199        """A string representation of this space.
200
201        The representation will include bounds, shape and dtype.
202        If a bound is uniform, only the corresponding scalar will be given to avoid redundant and ugly strings.
203
204        Returns:
205            A representation of the space
206        """
207        return f"Box({self.low_repr}, {self.high_repr}, {self.shape}, {self.dtype})"
208    
209    # 对于等号运算符的重载
210    def __eq__(self, other:Any) -> bool:
211        """Check whether `other` is equivalent to this instance. Doesn't check dtype equivalence."""
212        # 两个Box空间是否相等,看下面四条
213        return (
214            isinstance(other, MyBox) # 类的类型是否相容
215            and (self.shape == other.shape) # shape大小是否一致
216            and np.allclose(self.low, other.low) # 用于比较浮点数,两者相差是否在一定范围内
217            and np.allclose(self.high, other.high) # 上下界是否一样
218        )
219    # 用于python的反序列化,暂时不用考虑
220    def __setstate__(self, state: Union[Iterable[tuple[str, Any]], Mapping[str, Any]]):
221        """Sets the state of the box for unpickling a box with legacy support."""
222        super().__setstate__(state)
223
224        # legacy support through re-adding "low_repr" and "high_repr" if missing from pickled state
225        if not hasattr(self, "low_repr"):
226            self.low_repr = _short_repr(self.low)
227
228        if not hasattr(self, "high_repr"):
229            self.high_repr = _short_repr(self.high)

测试代码:

 1testbox = MyBox(
 2    np.array([[0,0,0,0],
 3              [-np.inf,-np.inf,-np.inf,-np.inf],
 4              [-np.inf,-np.inf,0,0]]
 5            ), 
 6    np.array([[1,1,1,1],
 7              [np.inf,np.inf, 1, 1], 
 8              [1, 1, np.inf, np.inf]]
 9            ), 
10    (3,4),
11    dtype=np.float64)
12print(testbox.shape, testbox.is_bounded("both"), testbox.is_np_flattenable)
13# (3, 4) False True
14
15print(testbox.bounded_above)
16print(testbox.bounded_below)
17print(testbox.sample())
18print(testbox.contains(([[0.1,0.1,0.1,0.1],[0,0,0,0],[0,0,0.1,0.1]])))
19
20'''
21[[ True  True  True  True]
22 [False False  True  True]
23 [ True  True False False]]
24[[ True  True  True  True]
25 [False False False False]
26 [False False  True  True]]
27[[ 0.02490971  0.73054417  0.12991174  0.0641791 ]
28 [-0.96922534  1.03660516  0.84641361  0.84423148]
29 [-2.12524701  0.19277976  1.75165337  0.16050653]]
30True
31/tmp/ipykernel_18223/1967140855.py:169: UserWarning: WARN: Casting input x to numpy array.
32  gym.logger.warn("Casting input x to numpy array.")
33'''