Local SLAM的业务主线——AddRangeData
现在我们已经了解了子图的概念、 占用栅格的数据结构和更新方式。 接下来,让我们来一起分析一下LocalTrajectoryBuilder2D的成员函数AddRangeData。 我希望通过分析这个函数,能够让我们从总体上了解Local SLAM的业务逻辑。
让我们再次祭出关于Local SLAM的框图。 它有扫描匹配器(Scan Matiching),以激光雷达的扫描数据和位姿估计为输入,使用Ceres库完成扫描匹配,输出位姿的观测值。一方面反馈给位姿估计器用于修正估计值, 另一方面提供给运动滤波器(Motion Filter)用于判定机器人是否产生了运动。如果产生了运动,则将此时的点云数据插入到当前正在维护的子图中。 同时输出插入的结果,包括时间、位姿、子图、扫描数据等信息。
1. 总体逻辑
下面是LocalTrajectoryBuilder2D的成员函数AddRangeData的代码片段。 该函数返回的就是一个MatchingResult的对象,看他的字面意思就是扫描匹配后的结果,实际上该类型中有一个字段insertion_result用于描述把扫描数据插入子图的结果。 该函数基本上完成了整个Local SLAM的业务。这个函数有两个输入参数,其中sensor_id是激光雷达的索引,第二个参数是未经时间同步的扫描数据, 它应该是从ROS系统中转换过来的。
std::unique_ptr<LocalTrajectoryBuilder2D::MatchingResult>
LocalTrajectoryBuilder2D::AddRangeData(const std::string& sensor_id, const sensor::TimedPointCloudData& unsynchronized_data) {
在函数的一开始,先把输入的索引和数据记录在数据收集器range_data_collator_中,同时我们将得到一个做了时间同步的扫描数据。检查一下数据,如果为空就报错退出。 检查通过之后,就用一个临时的对象time记录下同步时间。
auto synchronized_data = range_data_collator_.AddRangeData(sensor_id, unsynchronized_data);
if (synchronized_data.ranges.empty()) {
LOG(INFO) << "Range data collator filling buffer.";
return nullptr;
}
const common::Time& time = synchronized_data.time;
接下来调用函数InitializeExtrpolator构造位姿估计器。如果配置中说明不使用IMU估计位姿,那么就需要根据扫描匹配的结果估计位姿,要求AddRangeData完成位姿估计器的构造。 如果使用IMU进行估计,那么将在接收到IMU数据之后开始构造位姿估计器,这点可以通过追踪函数AddImuData看到。
if (!options_.use_imu_data())
InitializeExtrapolator(time);
if (extrapolator_ == nullptr) {
// Until we've initialized the extrapolator with our first IMU message, we
// cannot compute the orientation of the rangefinder.
LOG(INFO) << "Extrapolator not yet initialized.";
return nullptr;
}
检查扫描数据的时间确保最后一个数据的时间偏移量大于等于0, 关于扫描数据的时间可以回头看使用SensorBridge转换激光传感器数据一文。 然后获取第一个数据点的绝对时间,将它与位姿估计器的时间对比来判定位姿估计器是否完成了初始化操作。
CHECK_LE(synchronized_data.ranges.back().point_time[3], 0.f);
const common::Time time_first_point = time + common::FromSeconds(synchronized_data.ranges.front().point_time[3]);
if (time_first_point < extrapolator_->GetLastPoseTime()) {
LOG(INFO) << "Extrapolator is still initializing.";
return nullptr;
}
如果历史没有累积数据,就用成员变量accumulation_started_记录下当前的时间,作为跟踪轨迹的开始时刻。
if (num_accumulated_ == 0)
accumulation_started_ = std::chrono::steady_clock::now();
接着创建了一个临时的容器range_data_poses用于记录各个扫描点所对应的机器人位姿,并为它申请了与测量点数量相同的内存空间。变量warned用于控制警告信息的打印。
std::vector range_data_poses;
range_data_poses.reserve(synchronized_data.ranges.size());
bool warned = false;
该函数在一个for循环中通过位姿估计器获取各个扫描点对应的机器人位姿。在循环的一开始,先计算测量时间,并与位姿估计器的时间对比。如果时间滞后,在给出了警告信息后, 盲目的调整时间,以防止因果倒置的问题。如果一切正常,我们就可以通过对象extrapolator_和测量时间获取对应的位姿。
for (const auto& range : synchronized_data.ranges) {
common::Time time_point = time + common::FromSeconds(range.point_time[3]);
if (time_point < extrapolator_->GetLastExtrapolatedTime()) {
if (!warned) {
LOG(ERROR) << "Timestamp of individual range data point jumps backwards from " << extrapolator_->GetLastExtrapolatedTime() << " to " << time_point;
warned = true;
}
time_point = extrapolator_->GetLastExtrapolatedTime();
}
range_data_poses.push_back(extrapolator_->ExtrapolatePose(time_point).cast());
}
如果我们还没有累积过扫描数据,就重置对象accumulated_range_data_。
if (num_accumulated_ == 0)
accumulated_range_data_ = sensor::RangeData{{}, {}, {}};
在接下来的for循环中,抛弃掉所有测量距离小于配置min_range的hit点,并把那些超过配置max_range的测量点划归miss点。在第43和44行中, 通过左乘机器人位姿,将机器人坐标系下的传感器位姿,和激光测量的hit点位姿,转换到局部地图的坐标系下。然后在第45和46行中计算测量距离, 实际上这个计算有点重复了,因为ROS系统中原始的激光数据就是距离信息,这里来回转换了一遍有点多余。最后在47到52行的if-else语句中,完成hit点和miss点的筛选工作。 遍历所有点之后,累加计数num_accumulated_。
for (size_t i = 0; i < synchronized_data.ranges.size(); ++i) {
const Eigen::Vector4f& hit = synchronized_data.ranges[i].point_time;
const Eigen::Vector3f origin_in_local = range_data_poses[i] * synchronized_data.origins.at(synchronized_data.ranges[i].origin_index);
const Eigen::Vector3f hit_in_local = range_data_poses[i] * hit.head<3>();
const Eigen::Vector3f delta = hit_in_local - origin_in_local;
const float range = delta.norm();
if (range >= options_.min_range()) {
if (range <= options_.max_range())
accumulated_range_data_.returns.push_back(hit_in_local);
else
accumulated_range_data_.misses.push_back(origin_in_local + options_.missing_data_ray_length() / range * delta);
}
}
++num_accumulated_;
以上的这些工作无非是将Cartographer系统中原始的激光点云数据转换成占用栅格和插入器需要的RangeData类型的数据。而扫描匹配、运动过滤、更新子图的工作都是在下面的代码片段中实现的。
每当累积的传感器数据数量超过了配置值num_accumulated_range_data,就会调用函数AddAccumulatedRangeData完成Local SLAM的几项核心任务,并返回记录了子图插入结果的扫描匹配结果。 在调用该函数之前,先将计数清零,并通过位姿估计器获取重力的方向,记录下当前的累积位姿。
if (num_accumulated_ >= options_.num_accumulated_range_data()) {
num_accumulated_ = 0;
const transform::Rigid3d gravity_alignment = transform::Rigid3d::Rotation(extrapolator_->EstimateGravityOrientation(time));
accumulated_range_data_.origin = range_data_poses.back().translation();
return AddAccumulatedRangeData(time,
TransformToGravityAlignedFrameAndFilter(gravity_alignment.cast() * range_data_poses.back().inverse(), accumulated_range_data_),
gravity_alignment);
}
通过上面的代码片段,可以看到在给函数AddAccumulatedRangeData传参的时候调用了函数TransformToGravityAlignedFrameAndFilter。这个函数主要是以重力方向为参考修正传感器数据后进行体素化滤波。 关于这些先略过,以后会在附录中予以介绍。
下面的代码片段是函数AddAccumulatedRangeData的实现。它有三个参数,其中time是当前同步时间,gravity_aligned_range_data则是经过重力修正后的传感器数据, gravity_alignment是重力方向。在函数的一开始先检查输入的数据不空。
std::unique_ptr<LocalTrajectoryBuilder2D::MatchingResult> LocalTrajectoryBuilder2D::AddAccumulatedRangeData(
const common::Time time,
const sensor::RangeData& gravity_aligned_range_data,
const transform::Rigid3d& gravity_alignment) {
if (gravity_aligned_range_data.returns.empty()) {
LOG(WARNING) << "Dropped empty horizontal range data.";
return nullptr;
}
计算重力修正之后的机器人位姿,然后通过函数ScanMatch进行扫描匹配,并返回新的位姿估计。
const transform::Rigid3d non_gravity_aligned_pose_prediction = extrapolator_->ExtrapolatePose(time);
const transform::Rigid2d pose_prediction = transform::Project2D(non_gravity_aligned_pose_prediction * gravity_alignment.inverse());
std::unique_ptr<transform::Rigid2d> pose_estimate_2d = ScanMatch(time, pose_prediction, gravity_aligned_range_data);
if (pose_estimate_2d == nullptr) {
LOG(WARNING) << "Scan matching failed.";
return nullptr;
}
然后将新的位姿估计反馈给位姿估计器对象extrapolator_。
const transform::Rigid3d pose_estimate = transform::Embed3D(*pose_estimate_2d) * gravity_alignment;
extrapolator_->AddPose(time, pose_estimate);
接着将传感器的数据转换到新的位姿下,通过函数InsertIntoSubmap将新的扫描数据插入到子图中。
sensor::RangeData range_data_in_local = TransformRangeData(gravity_aligned_range_data, transform::Embed3D(pose_estimate_2d->cast<float>()));
std::unique_ptr<InsertionResult> insertion_result = InsertIntoSubmap(time, range_data_in_local, gravity_aligned_range_data, pose_estimate, gravity_alignment.rotation());
然后记下累积时间,并返回匹配结果。kLocalSlamLatencyMetric应该是用来监视延迟的。
auto duration = std::chrono::steady_clock::now() - accumulation_started_;
kLocalSlamLatencyMetric->Set(std::chrono::duration_cast(duration).count());
return common::make_unique<MatchingResult>(MatchingResult{time, pose_estimate,
std::move(range_data_in_local),
std::move(insertion_result)});
}
2. 扫描匹配
我们看到AddAccumulatedRangeData调用函数ScanMatch进行扫描匹配,主要是将当前的传感器数据与当前维护的子图进行匹配,寻找一个位姿估计使得传感器数据能够尽可能的与地图匹配上。 这是一个最优化的问题,Cartographer主要通过ceres库求解。
下面是函数ScanMatch的代码片段,它有三个参数,分别记录了参考时间、位姿估计器预测的位姿、经过重力修正之后的扫描数据。 在函数的一开始先获取对象active_submaps_中维护的用于扫描匹配的旧图。
std::unique_ptr<transform::Rigid2d> LocalTrajectoryBuilder2D::ScanMatch(const common::Time time,
const transform::Rigid2d& pose_prediction,
const sensor::RangeData& gravity_aligned_range_data) {
std::shared_ptr<const Submap2D> matching_submap = active_submaps_.submaps().front();
接着构建了一个自适应的体素滤波器,再次对输入的扫描数据进行了一次过滤。
sensor::AdaptiveVoxelFilter adaptive_voxel_filter(options_.adaptive_voxel_filter_options());
const sensor::PointCloud filtered_gravity_aligned_point_cloud = adaptive_voxel_filter.Filter(gravity_aligned_range_data.returns);
if (filtered_gravity_aligned_point_cloud.empty())
return nullptr;
然后以输入的位姿估计器预测的位姿作为扫描匹配器迭代的初始位姿。我们还可以通过配置项要求使用对象real_time_correlative_scan_matcher_所实现的一种在线的相关性扫描匹配算法, 进一步的对位姿估计进行优化以得到一个较好的迭代初值。
transform::Rigid2d initial_ceres_pose = pose_prediction;
if (options_.use_online_correlative_scan_matching()) {
CHECK_EQ(options_.submaps_options().grid_options_2d().grid_type(), proto::GridOptions2D_GridType_PROBABILITY_GRID);
double score = real_time_correlative_scan_matcher_.Match(pose_prediction, filtered_gravity_aligned_point_cloud,
*static_cast<const ProbabilityGrid*>(matching_submap->grid()), &initial_ceres_pose);
kFastCorrelativeScanMatcherScoreMetric->Observe(score);
}
给定了迭代初值之后,就可以通过对象ceres_scan_matcher_把扫描匹配问题描述成为一个最小二乘的问题,使用优化库ceres提供的工具寻优,得到一个使观测数据出现的概率最大化的位姿估计。 该估计将被当作机器人的实际位姿参与后序的计算。寻优的结果将保存在对象pose_observation中,临时对象summary则记录了优化迭代的过程。
auto pose_observation = common::make_unique<transform::Rigid2d>();
ceres::Solver::Summary summary;
ceres_scan_matcher_.Match(pose_prediction.translation(), initial_ceres_pose, filtered_gravity_aligned_point_cloud,
*matching_submap->grid(), pose_observation.get(), &summary);
如果我们成功的求解了扫描匹配问题,那么就计算一下残差通过判据对象kCeresScanMatcherCostMetric,kScanMatcherResidualDistanceMetric和kScanMatcherResidualAngleMetric检查一下计算结果。
if (pose_observation) {
kCeresScanMatcherCostMetric->Observe(summary.final_cost);
double residual_distance = (pose_observation->translation() - pose_prediction.translation()).norm();
kScanMatcherResidualDistanceMetric->Observe(residual_distance);
double residual_angle = std::abs(pose_observation->rotation().angle() - pose_prediction.rotation().angle());
kScanMatcherResidualAngleMetric->Observe(residual_angle);
}
最后返回优化之后的位姿估计。
return pose_observation;
}
3. 插入子图
完成了扫描匹配之后,就可以调用函数InsertIntoSubmap将传感器数据插入到当前正在维护的子图中了。下面是该函数的代码片段, 该函数有5个参数。其中range_data_in_local是在优化之后的位姿估计下观测到的hit点和miss点云数据。而gravity_aligned_range_data中记录的则是扫描匹配之前执行了重力修正的扫描数据。 pose_estimate则是优化之后的位姿估计,gravity_alignment描述了重力方向。
std::unique_ptr<LocalTrajectoryBuilder2D::InsertionResult>
LocalTrajectoryBuilder2D::InsertIntoSubmap(const common::Time time, const sensor::RangeData& range_data_in_local,
const sensor::RangeData& gravity_aligned_range_data,
const transform::Rigid3d& pose_estimate,
const Eigen::Quaterniond& gravity_alignment) {
在函数的一开始,先通过运动滤波器来判定但前是否发生了明显的运动,若未发生则直接返回退出。这样可以减少很多数据量。
if (motion_filter_.IsSimilar(time, pose_estimate))
return nullptr;
在通知对象active_submaps_接收新的数据更新地图之前,先把其维护的子图暂时保存在临时的容器insertion_submaps中。
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps;
for (const std::shared_ptr& submap : active_submaps_.submaps())
insertion_submaps.push_back(submap);
active_submaps_.InsertRangeData(range_data_in_local);
接下来出于闭环检测的目的,构建一个自适应的体素滤波器对重力修正的扫描数据进行滤波。
sensor::AdaptiveVoxelFilter adaptive_voxel_filter(options_.loop_closure_adaptive_voxel_filter_options());
const sensor::PointCloud filtered_gravity_aligned_point_cloud = adaptive_voxel_filter.Filter(gravity_aligned_range_data.returns);
最后构建一个InsertionResult对象并返回。
return common::make_unique<InsertionResult>(InsertionResult{
std::make_shared<const TrajectoryNode::Data>(TrajectoryNode::Data{
time,
gravity_alignment,
filtered_gravity_aligned_point_cloud,
{}, // 'high_resolution_point_cloud' is only used in 3D.
{}, // 'low_resolution_point_cloud' is only used in 3D.
{}, // 'rotational_scan_matcher_histogram' is only used in 3D.
pose_estimate}),
std::move(insertion_submaps)});
}
4. 完
本文中,我们详细分析了类型LocalTrajectoryBuilder2D的成员函数AddRangeData以及它直接调用的函数AddAccumulatedRangeData。这两个函数控制了整个Local SLAM的业务逻辑, 函数AddRangeData主要是在筛选激光点云并将它们划分为hit点和miss点,AddAccumulatedRangeData则调用函数ScanMatch和InsertIntoSubmap完成扫描匹配和子图更新的操作。