首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

使用SensorBridge转换激光传感器数据

上一章中,我们看到map_builder_bridge_为每一条轨迹构建了一个SensorBridge对象, 用于将ROS的消息转换成Cartographer中的传感器数据类型。同时,我们还简单的分析了它的构造函数和一些重要的字段。下面我们来查看一下激光传感器数据的处理过程。

分析node对象的时候我们已经看到,node对象在激光传感器的消息回调函数中, 通过轨迹索引trajectory_id从map_builder_bridge_对象中查询获得对应轨迹的SensorBridge对象,并通过该对象的HandleMultiEchoLaserScanMessage函数来处理雷达数据。 下面是该函数的代码片段,前两行只是定义了两个临时变量分别用于记录转换之后的点云数据和时间戳。 先在第4行中通过msg_conversion.h, msg_conversion.cc中定义和实现的函数ToPointCloudWithIntensities, 将ROS的消息转换成点云数据。然后在第5行中通过成员函数HandleLaserScan来处理点云数据。

        void SensorBridge::HandleMultiEchoLaserScanMessage(const std::string& sensor_id, const sensor_msgs::MultiEchoLaserScan::ConstPtr& msg) {
            carto::sensor::PointCloudWithIntensities point_cloud;
            carto::common::Time time;
            std::tie(point_cloud, time) = ToPointCloudWithIntensities(*msg);
            HandleLaserScan(sensor_id, time, msg->header.frame_id, point_cloud);
        }

下面让我们来依次查看一下这两个函数的实现。

1. 转换数据结构

        $ rosmsg show sensor_msgs/MultiEchoLaserScan    
        std_msgs/Header header
          uint32 seq
          time stamp
          string frame_id
        float32 angle_min
        float32 angle_max
        float32 angle_increment
        float32 time_increment
        float32 scan_time
        float32 range_min
        float32 range_max
        sensor_msgs/LaserEcho[] ranges
          float32[] echoes
        sensor_msgs/LaserEcho[] intensities
          float32[] echoes

在查看函数ToPointCloudWithIntensities之前,让我们先来看一下ROS系统的消息长什么样。如下面右侧的代码片段所示,我们可以通过工具rosmsg来查看该消息的数据结构。

其中,字段header是ROS消息通用的消息头,记录了消息的序列号、时间戳、以及参考坐标系。字段angle_min、angle_max和angle_increment的单位是弧度(rad),一起说明了激光雷达的一次扫描的起始和结束测量角, 以及采样角度步长。字段time_increment是指一次扫描中各个测量值之间的时间间隔,scan_time则是两次扫描之间的时间间隔,这两个字段的单位是秒(s)。 字段range_min和range_max说明了扫描测量的有效数据范围,单位是米(m)。

以上的这些字段,总体上说明了传感器的测量能力和方式。实际的测量数据记录在数组ranges和intensities中。 数组ranges记录了一次扫描的各个测量值,它是一个关于float32的数组,可以记录多线激光的数据,单位是米(m)。数组intensities与ranges中的数据是一一对应的,描述的是每个测量数据的强度。

让我们再来看一下Cartographer中的数据结构PointCloudWithIntensities, 它是定义在point_cloud.h中的一个结构体。 其定义形式如下面左边的代码片段所示,一共有两个字段。与ROS消息中的字段ranges和intensities类似,这两个字段中的元素也是一一对应的。

        typedef std::vector<Eigen::Vector4f> TimedPointCloud;    
        struct PointCloudWithIntensities {
            TimedPointCloud points;
            std::vector<float> intensities;
        };

字段points的数据类型是代码片段中第一行中的typedef定义的vector容器。每个元素都是一个4维的向量,其中前三维的数据记录了扫描数据所对应的空间坐标,第四维的数据则记录了数据产生的时间。 字段intensities没什么好说的,就是描述了points中每个元素所对应的数据强度。

函数ToPointCloudWithIntensities所做的工作就是根据ROS的消息内容,计算扫描到的障碍物在工作空间中的坐标位置,并将其保存在一个特定的数据结构中。下面是该函数的实现, 它只是调用了LaserScanToPointCloudWithIntensities。这是一个模板函数,翻看源文件我们会发现LaserScan类型的传感器数据也是通过这个模板函数进行解析的。一会儿分析这个模板函数的时候,我们还可以看到,对于Cartographer而言多线激光也是简化成单线激光来应用的。

        std::tuple<::cartographer::sensor::PointCloudWithIntensities, ::cartographer::common::Time> ToPointCloudWithIntensities(const sensor_msgs::MultiEchoLaserScan& msg) {
            return LaserScanToPointCloudWithIntensities(msg);
        }

下面是模板函数LaserScanToPointCloudWithIntensities的代码片段,在函数的一开始先检查输入的数据是否合法。

        template <typename LaserMessageType>
        std::tuple<PointCloudWithIntensities, ::cartographer::common::Time> LaserScanToPointCloudWithIntensities(const LaserMessageType& msg) {
            CHECK_GE(msg.range_min, 0.f);
            CHECK_GE(msg.range_max, msg.range_min);
            if (msg.angle_increment > 0.f)
                CHECK_GT(msg.angle_max, msg.angle_min);
            else
                CHECK_GT(msg.angle_min, msg.angle_max);

接着创建了一个局部的对象point_cloud用于记录转换之后的点云数据,并在一个for循环中,从最小的扫描角度开始,依次计算各个测量数据在机器人坐标系下的坐标点和数据强度, 并将之添加到对象point_cloud中。

            PointCloudWithIntensities point_cloud;
            float angle = msg.angle_min;
            for (size_t i = 0; i < msg.ranges.size(); ++i) {
                const auto& echoes = msg.ranges[i];

对于每一个扫描数据,我们都先通过函数HasEcho检查一下。 这个函数定义在源文件msg_conversion.cc中, 它有两个重载版本,分别应用于LaserScan和MultiEchoLaserScan,对于LaserScan的ROS消息这个函数将总是返回true,对于MultiEchoLaserScan则检查字段ranges是否为空。 如果存在测量数据,则通过函数GetFirstEcho获取第一个扫描数据。对于LaserScan直接返回测量数据,对于MultiEchoLaserScan则返回第一个测量数据。所以我刚刚说单线激光和多线激光在这里似乎没有什么差别, 多线激光也是当作单线使的。

                if (HasEcho(echoes)) {
                    const float first_echo = GetFirstEcho(echoes);

接下来检查一下测量数据在测量范围内就可以计算对应的坐标了。一般我们使用激光雷达都是让它水平的扫描一个平面,所以这里的第16行才会构造一个沿机器人Z轴转动的转动矢量。 在Cartographer中,X轴的正方向是机器人的前进方向。所以在第18行中通过first_echo构建一个X轴方向上的运动矢量,并左乘旋转矩阵就可以得到扫描点在机器人坐标系下的坐标。 第18行语句中逗号之后的部分就是测量数据产生的时间。

                    if (msg.range_min <= first_echo && first_echo <= msg.range_max) {
                        const Eigen::AngleAxisf rotation(angle, Eigen::Vector3f::UnitZ());
                        Eigen::Vector4f point;
                        point << rotation * (first_echo * Eigen::Vector3f::UnitX()), i * msg.time_increment;
                        point_cloud.points.push_back(point);

在计算扫描点坐标的同时,还需要检查一下是否提供了数据强度。如果提供了这一数据,就将该数据填充到局部对象point_cloud中,否则写0。

                        if (msg.intensities.size() > 0) {
                            CHECK_EQ(msg.intensities.size(), msg.ranges.size());
                            const auto& echo_intensities = msg.intensities[i];
                            CHECK(HasEcho(echo_intensities));
                            point_cloud.intensities.push_back(GetFirstEcho(echo_intensities));
                        } else {
                            point_cloud.intensities.push_back(0.f);
                        }

在for循环的最后,增加angle获取下一个测量数据所对应的扫描角度。

                    }
                }
                angle += msg.angle_increment;
            }

按理说,完成了上述的操作就已经将ROS的系统消息转换成Cartographer所需要的数据结构了。但是源码中还是对时间戳做了如下的调整,个人猜测Cartographer中的时间戳是以数据采样结束为参考的。 所以这里的时间戳以最后一个测量数据为基准,point_cloud中所有的测量点都偏移了一段时间。最后返回填充好的点云数据和调整后的时间戳。

            ::cartographer::common::Time timestamp = FromRos(msg.header.stamp);
            if (!point_cloud.points.empty()) {
                const double duration = point_cloud.points.back()[3];
                timestamp += cartographer::common::FromSeconds(duration);
                for (Eigen::Vector4f& point : point_cloud.points)
                    point[3] -= duration;
            }
            return std::make_tuple(point_cloud, timestamp);
        }

2. 提供数据到Cartographer中

在分析函数HandleMultiEchoLaserScanMessage的时候我们看到,将ROS的消息数据转换成为Cartographer的点云数据之后, 就调用函数HandleLaserScan来将处理后的数据喂给Cartographer进行后序的处理了。下面是HandleLaserScan的代码片段,在函数的一开始先确认一下输入的点云非空, 并且最后一个点云数据的时间小于等于0。

        void SensorBridge::HandleLaserScan(const std::string& sensor_id, const carto::common::Time time, const std::string& frame_id, const carto::sensor::PointCloudWithIntensities& points) {
            if (points.points.empty())
                return;
            CHECK_LE(points.points.back()[3], 0);

然后根据配置变量num_subdivisions_per_laser_scan_在一个for循环中将点云数据拆分为若干段。第6和第7行计算分段的起始和结束索引,在第8行中构建分段数据。 第9,10行是为了跳过同一个分段中的元素。

            for (int i = 0; i != num_subdivisions_per_laser_scan_; ++i) {
                const size_t start_index = points.points.size() * i / num_subdivisions_per_laser_scan_;
                const size_t end_index = points.points.size() * (i + 1) / num_subdivisions_per_laser_scan_;
                carto::sensor::TimedPointCloud subdivision(points.points.begin() + start_index, points.points.begin() + end_index);
                if (start_index == end_index)
                    continue;

接着参考分段中最后一个数据的时间调整其他数据的时间,但在该操作之前需要先确认当前的数据没有过时。成员容器sensor_to_previous_subdivision_time_中以sensor_id为索引记录了最新的数据产生时间, 如果分段的时间落后于记录值,将抛弃所对应的数据。

                const double time_to_subdivision_end = subdivision.back()[3];
                const carto::common::Time subdivision_time = time + carto::common::FromSeconds(time_to_subdivision_end);
                auto it = sensor_to_previous_subdivision_time_.find(sensor_id);
                if (it != sensor_to_previous_subdivision_time_.end() && it->second >= subdivision_time) {
                    // LOG(WARNING)
                    continue;
                }
                sensor_to_previous_subdivision_time_[sensor_id] = subdivision_time;
                for (Eigen::Vector4f& point : subdivision)
                    point[3] -= time_to_subdivision_end;
                CHECK_EQ(subdivision.back()[3], 0);

完成分段并调整了时间之后,就可以调用函数HandleRangefinder将分段数据喂给Cartographer了。

                HandleRangefinder(sensor_id, subdivision_time, frame_id, subdivision);
            }
        }

下面是HandleRangefinder的实现,在该函数中先通过tf_bridge_对象查询传感器坐标系相对于机器人坐标系之间的坐标变换,记录在对象sensor_to_tracking中。 只要该对象不是空指针,就说明成功找到转换关系,然后就可以通过对象trajectory_builder_添加传感器数据了。 这个trajectory_builder_实际上就是由map_builder对象提供的一个对象。

        void SensorBridge::HandleRangefinder(const std::string& sensor_id, const carto::common::Time time,
                                             const std::string& frame_id, const carto::sensor::TimedPointCloud& ranges) {
            const auto sensor_to_tracking = tf_bridge_.LookupToTracking(time, CheckNoLeadingSlash(frame_id));
            if (sensor_to_tracking != nullptr) {
                trajectory_builder_->AddSensorData(sensor_id, 
                                                   carto::sensor::TimedPointCloudData{time, sensor_to_tracking->translation().cast(), carto::sensor::TransformTimedPointCloud(ranges, sensor_to_tracking->cast<float>())}
                                                  );
            }
        }

终于,我们将激光传感器的数据喂给了Cartographer。

3. 完

本文中通过分析SensorBridge处理激光传感器数据的函数,我们发现它先通过函数ToPointCloudWithIntensities将ROS的消息数据转换成Cartographer中的点云数据, 然后根据配置将激光的扫描数据分段,并将拆分后的数据喂给Cartographer提供的轨迹构建器trajectory_builder_。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1