位姿图的创建与更新
在上一篇文章中,我们分析了Cartographer中的后端优化问题以及位姿图的形式, 同时初步了解了类PoseGraph2D的成员变量。 本文中我们将延续前后端的桥梁GlobalTrajectoryBuilder的思路, 来深入研究一下Cartographer是如何创建和更新位姿图的。
1. 位姿图的构建与初始化
在对象map_builder的构造函数中,看到根据我们的demo的配置, 该对象构建了一个类型为PoseGraph2D的对象用于2D建图。下面的代码是PoseGraph2D的构造函数,它的函数体是空的什么也没有做,只是在构造列表中完成了一些成员变量构造。 它有三个输入参数,其中options是从配置文件中装载的关于位姿图的配置项,optimization_problem是一个智能指针指向后端优化问题求解器,thread_pool则是一个线程池。
PoseGraph2D::PoseGraph2D(const proto::PoseGraphOptions& options,
std::unique_ptr<optimization::OptimizationProblem2D> optimization_problem,
common::ThreadPool* thread_pool)
: options_(options),
optimization_problem_(std::move(optimization_problem)),
constraint_builder_(options_.constraint_builder_options(), thread_pool) {}
在开始建图之前还在对象map_builder的接口函数AddTrajectoryBuilder中对位姿图做了一些配置。 根据轨迹的配置,我们可以选择通过接口AddTrimmer给位姿图对象添加OverlappingSubmapsTrimmer2D类型和PureLocalizationTrimmer类型的修饰器,分别用于根据子图间重叠的部分修饰地图和纯粹的定位。 我们暂且不管所谓的修饰器是个什么鬼,先来看一下这个函数AddTrimmer。其实现如下面的代码所示,其输入参数就是一个指向修饰器对象的智能指针。 在第5行中通过函数AddWorkItem将一个lambda表达式添加到工作队列中。
void PoseGraph2D::AddTrimmer(std::unique_ptr<PoseGraphTrimmer> trimmer) {
common::MutexLocker locker(&mutex_);
// C++11 does not allow us to move a unique_ptr into a lambda.
PoseGraphTrimmer* const trimmer_ptr = trimmer.release();
AddWorkItem([this, trimmer_ptr]() REQUIRES(mutex_) { trimmers_.emplace_back(trimmer_ptr); });
}
lambda表达式是C++11标准之后引入的特性,与其他语言中提到的匿名函数差不多,可以理解为一个没有名称的内联函数。一个lambda表达式具有如下的语法形式:
$$ [capture\ list]\ (parameter\ list)\ \text{->}\ return\ type\ \{\ function\ body\ \} $$这里的lambda表达式描述的是一个void()类型的函数,它没有返回值也没有参数列表,所以上述语法中的"\(\text{->}\ return\ type\)"部分就不存在。 \([capture\ list]\)中获取的是在函数体中用到的一些变量。捕获this指针,是为了能够访问成员变量trimmers_,而trimmer_ptr则是从输入参数中获取的修饰器对象指针。 这个表达式的作用就是将传参的修饰器指针放入容器trimmers_中。
void PoseGraph2D::AddWorkItem(const std::function<void()>& work_item) {
if (work_queue_ == nullptr)
work_item();
else
work_queue_->push_back(work_item);
}
但是lambda表达式并不会立即执行,它将被当做一个类似于函数指针或者仿函数这样的可执行对象传参给函数AddWorkItem,在合适的条件下被调用。 右侧是该函数的代码,可以看到它的输入参数就是一个能够兼容上述lambda表达式的void()类型的可调用对象。
其工作内容也十分简单,如果工作队列(work_queue_)存在就将任务(work_item)放到队列中,如果不存在就直接执行。这里的工作队列是PoseGraph2D的一个成员变量,它的完整定义如下所示, 是一个指向双端队列的智能指针。在刚刚的构造函数中没有创建过这一对象,所以一开始都是nullptr。至于该对象什么时候被创建,它所记录的工作任务又是怎么被安排执行的, 我们将在后文介绍工作队列的时候详细分析。
std::unique_ptr<std::deque<std::function<void()>>> work_queue_ GUARDED_BY(mutex_);
此外,在开始建图之前还可以通过配置指定新的运动轨迹的起始位姿。在该配置下将调用PoseGraph2D的成员函数SetInitialTrajectoryPose完成初始位姿配置,其实现如下所示, 它有四个参数。其中from_trajectory_id是新轨迹的索引。剩下三个参数不是很理解,看字面意思是time时刻新轨迹起点相对于参考轨迹(to_trajectory_id)的位姿(pose)。 在函数中,直接根据后三个参数创建一个InitialTrajectoryPose类型的对象,放置到容器initial_trajectory_poses_中。
void PoseGraph2D::SetInitialTrajectoryPose(const int from_trajectory_id, const int to_trajectory_id,
const transform::Rigid3d& pose, const common::Time time) {
common::MutexLocker locker(&mutex_);
initial_trajectory_poses_[from_trajectory_id] = InitialTrajectoryPose{to_trajectory_id, pose, time};
}
2. 添加节点
在研究类型GlobalTrajectoryBuilder中用于添加点云数据的接口AddSensorData时候, 我们看到在完成Local SLAM的任务之后,就会通过位姿图对象的函数AddNode将更新结果喂给后端。下面是该函数的代码片段,其中constant_data记录了更新子图时的点云信息以及相对位姿, trajectory_id记录了轨迹索引,insertion_submaps则是更新的子图。
NodeId PoseGraph2D::AddNode(std::shared_ptr<const TrajectoryNode::Data> constant_data,
const int trajectory_id,
const std::vector<std::shared_ptr<const Submap2D>>& insertion_submaps) {
在函数一开始先将局部位姿转换成为世界坐标系下的位姿。函数GetLocalToGlobalTransform根据最新一次优化之后的子图位姿生成局部坐标系到世界坐标系的坐标变换关系。
const transform::Rigid3d optimized_pose(GetLocalToGlobalTransform(trajectory_id) * constant_data->local_pose);
接下来先对信号量加锁,以保证多线程的安全。并通过函数AddTrajectoryIfNeeded来维护各个轨迹之间的连接关系,根据输入的数据和刚刚生成的全局位姿构建一个TrajectoryNode的对象, 并将之添加到节点的容器trajectory_nodes_中。至此我们就添加了一个新的节点,并为之分配了一个NodeId。
common::MutexLocker locker(&mutex_);
AddTrajectoryIfNeeded(trajectory_id);
const NodeId node_id = trajectory_nodes_.Append(trajectory_id, TrajectoryNode{constant_data, optimized_pose});
++num_trajectory_nodes_;
根据Local SLAM中子图的维护方式和子图更新数据的传递过程,我们可以认为输入参数insertion_submaps.back()中记录了最新的子图。 所以下面的条件语句中先查询一下容器submap_data_中是否有轨迹索引为trajectory_id的子图,再判定insertion_submaps.back()中的子图是否是新生成的。 若是则将之添加到容器submap_data_中,同时分配一个SubmapId。
if (submap_data_.SizeOfTrajectoryOrZero(trajectory_id) == 0 ||
std::prev(submap_data_.EndOfTrajectory(trajectory_id))->data.submap != insertion_submaps.back()) {
const SubmapId submap_id = submap_data_.Append(trajectory_id, InternalSubmapData());
submap_data_.at(submap_id).submap = insertion_submaps.back();
}
最后通过lambda表达式和函数AddWorkItem注册一个为新增节点添加约束的任务ComputeConstraintsForNode。根据Cartographer的思想, 在该任务下应当会将新增的节点与所有已经处于kFinished状态的子图进行一次匹配建立可能存在的闭环约束。 此外,当有新的子图进入kFinished状态时,还会将之与所有的节点进行一次匹配。所以这里会通过insertion_submaps.front()来查询旧图的更新状态。
const bool newly_finished_submap = insertion_submaps.front()->finished();
AddWorkItem([=]() REQUIRES(mutex_) {
ComputeConstraintsForNode(node_id, insertion_submaps, newly_finished_submap);
});
return node_id;
}
3. 添加约束
我们再来看一下函数ComputeConstraintsForNode,它除了有添加约束的作用,还会触发工作队列的构建和运行。下面是该函数的代码片段,它有三个参数。其中node_id是待更新的节点索引, insertion_submaps则是从Local SLAM一路传递过来的新旧子图,newly_finished_submap表示旧图是否结束更新了。
void PoseGraph2D::ComputeConstraintsForNode(const NodeId& node_id,
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps,
const bool newly_finished_submap) {
在函数的一开始先获取节点数据,并通过函数InitializeGlobalSubmapPoses获取新旧子图的索引。 实际上这个函数还是蛮长的,它除了获取ID之外还检查了新子图是否第一次被后端看见,若是则为之计算全局位姿并喂给后端优化器optimization_problem_。 我们将在后端优化过程一文中在回头来看这个函数。
const auto& constant_data = trajectory_nodes_.at(node_id).constant_data;
const std::vector<SubmapId> submap_ids = InitializeGlobalSubmapPoses(node_id.trajectory_id, constant_data->time, insertion_submaps);
接下来以旧图为参考,计算节点相对于子图的局部位姿\(\varepsilon_{ij}\),以及它在世界坐标系下的位姿\(\varepsilon_j^s\)。并将之提供给后端优化器optimization_problem_。
const SubmapId matching_id = submap_ids.front();
const transform::Rigid2d local_pose_2d = transform::Project2D(constant_data->local_pose * transform::Rigid3d::Rotation(constant_data->gravity_alignment.inverse()));
const transform::Rigid2d global_pose_2d = optimization_problem_->submap_data().at(matching_id).global_pose * constraints::ComputeSubmapPose(*insertion_submaps.front()).inverse() *local_pose_2d;
optimization_problem_->AddTrajectoryNode(matching_id.trajectory_id,
optimization::NodeSpec2D{constant_data->time, local_pose_2d,
global_pose_2d, constant_data->gravity_alignment});
然后为新增的节点和新旧子图之间添加INTRA_SUBMAP类型的约束。
for (size_t i = 0; i < insertion_submaps.size(); ++i) {
const SubmapId submap_id = submap_ids[i];
CHECK(submap_data_.at(submap_id).state == SubmapState::kActive);
submap_data_.at(submap_id).node_ids.emplace(node_id);
const transform::Rigid2d constraint_transform = constraints::ComputeSubmapPose(*insertion_submaps[i]).inverse() * local_pose_2d;
constraints_.push_back(Constraint{submap_id,
node_id,
{transform::Embed3D(constraint_transform),
options_.matcher_translation_weight(),
options_.matcher_rotation_weight()},
Constraint::INTRA_SUBMAP});
}
紧接着遍历所有已经处于kFinished状态的子图,建立它们与新增节点之间可能的约束。
for (const auto& submap_id_data : submap_data_) {
if (submap_id_data.data.state == SubmapState::kFinished) {
CHECK_EQ(submap_id_data.data.node_ids.count(node_id), 0);
ComputeConstraint(node_id, submap_id_data.id);
}
}
如果旧图切换到kFinished状态,则遍历所有已经进行过优化的节点,建立它们与旧图之间可能的约束。
if (newly_finished_submap) {
const SubmapId finished_submap_id = submap_ids.front();
InternalSubmapData& finished_submap_data = submap_data_.at(finished_submap_id);
CHECK(finished_submap_data.state == SubmapState::kActive);
finished_submap_data.state = SubmapState::kFinished;
ComputeConstraintsForOldNodes(finished_submap_id);
}
最后通知约束构建器新增节点的操作结束,增加计数器num_nodes_since_last_loop_closure_。41行中调用的函数DispatchOptimization对于工作队列的运转有着重要的作用。
constraint_builder_.NotifyEndOfNode();
++num_nodes_since_last_loop_closure_;
CHECK(!run_loop_closure_);
if (options_.optimize_every_n_nodes() > 0 && num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes())
DispatchOptimization();
}
4. 工作队列
我们已经看到函数AddWorkItem会根据工作队列是否存在,选择直接运行工作任务还是放到队列中等待以后执行。当时留下了两个问题,其一这个工作队列是什么时候被创建的?其二队列里的任务是如何调度的? 这一切都是从函数DispatchOptimization开始的,下面是该函数的实现代码。在该函数中,我们看到它会判定工作队列是否存在,如果不存在就创建一个对象。
void PoseGraph2D::DispatchOptimization() {
run_loop_closure_ = true;
// If there is a 'work_queue_' already, some other thread will take care.
if (work_queue_ == nullptr) {
work_queue_ = common::make_unique<std::deque<std::function<void()>>>();
constraint_builder_.WhenDone(std::bind(&PoseGraph2D::HandleWorkQueue, this, std::placeholders::_1));
}
}
需要额外关注一下上面代码中的第6行,它通过约束构建器的WhenDone接口注册了一个回调函数HandleWorkQueue。先暂时不管这个约束构建器,我们将有专门文章介绍它。 大致意思是当约束构建器在后台完成了一些工作之后,就会调用这个函数HandleWorkQueue,来调度队列里的任务。下面是这个函数的代码片段,它以约束构建器的结果为输入参数。 并在函数的一开始,将构建的约束添加到容器constraints_中,然后调用函数RunOptimization进行优化。
void PoseGraph2D::HandleWorkQueue(const constraints::ConstraintBuilder2D::Result& result) {
{
common::MutexLocker locker(&mutex_);
constraints_.insert(constraints_.end(), result.begin(), result.end());
}
RunOptimization();
如果提供了全局优化之后的回调函数,则调用回调函数。
if (global_slam_optimization_callback_) {
// 准备trajectory_id_to_last_optimized_submap_id和trajectory_id_to_last_optimized_node_id
global_slam_optimization_callback_(trajectory_id_to_last_optimized_submap_id,
trajectory_id_to_last_optimized_node_id);
}
然后,更新轨迹之间的连接关系,并调用修饰器完成地图的修饰。
common::MutexLocker locker(&mutex_);
for (const Constraint& constraint : result)
UpdateTrajectoryConnectivity(constraint);
TrimmingHandle trimming_handle(this);
for (auto& trimmer : trimmers_)
trimmer->Trim(&trimming_handle);
trimmers_.erase(std::remove_if(trimmers_.begin(), trimmers_.end(),
[](std::unique_ptr& trimmer) { return trimmer->IsFinished(); }), trimmers_.end());
此时,我们应当是已经完成了一次地图的后端优化。所以需要将计数器num_nodes_since_last_loop_closure_清零,并更新run_loop_closure_表示当前系统空闲没有进行闭环检测。
num_nodes_since_last_loop_closure_ = 0;
run_loop_closure_ = false;
接着就在一个for循环中处理掉work_queue_中所有等待的任务,这些任务主要是添加节点、添加传感器数据到位姿图中。
while (!run_loop_closure_) {
if (work_queue_->empty()) {
work_queue_.reset();
return;
}
work_queue_->front()();
work_queue_->pop_front();
}
有时还没有完全处理完队列中的所有任务,就因为状态run_loop_closure_再次为true开启新的闭环检测而退出。此时需要重新注册一下回调函数。
LOG(INFO) << "Remaining work items in queue: " << work_queue_->size();
// We have to optimize again.
constraint_builder_.WhenDone(std::bind(&PoseGraph2D::HandleWorkQueue, this, std::placeholders::_1));
}
关于这个run_loop_closure_, 我在文件pose_graph_2d.cc中搜索了一下该对象, 发现只在函数DispatchOptimization中将其置true。再回过头来看添加节点时放入工作队列的函数ComputeConstraintsForNode,该函数最后调用函数DispatchOptimization的条件语句, 可以看到当累积了一定数量的新节点后就会触发闭环检测。
5. 完
本文中,我们分析了位姿图PoseGraph2D的构造过程,以及它是如何将前端输出的子图更新信息以及激光雷达扫描的点云数据添加到位姿图中的。 我们看到PoseGraph2D通过一个约束构建器在后台完成闭环检测。每当前端有输出的时候,PoseGraph2D不一定立即计算约束更新位姿图的,而是放到一个工作队列中,当闭环检测结束后在调度工作队列完成更新。