首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

约束构建器——constraint_builder_

上一节中我们介绍了位姿图的更新过程,发现PoseGraph2D使用对象constraint_builder_在后台进行闭环检测, 建立约束。对象constraint_builder_通过线程池并行地完成任务,有效的提高了系统的运行效率。本文中,我们将详细分析约束构建器对象constraint_builder_。

1. ConstraintBuilder2D的成员变量和构造函数

Cartographer使用一个叫做ConstraintBuilder2D的类来进行闭环检测并构建约束。 根据它的注释,可以看出它主要是异步地计算约束。我们可以在任意调用MaybeAddConstraint、MaybeAddGlobalConstraint、NotifyEndOfNode之后,调用一次WhenDone接口,来注册回调函数结束一轮操作。 如此不断的重复这个过程就可以持续地进行闭环检测,添加必要的约束。整个过程在分析位姿图的更新过程时, 就已经有了初步的了解。下表是该类型中定义的成员变量。

对象名称 对象类型 说明
options_ ConstraintBuilderOptions 关于约束构建器的各种配置
thread_pool_ ThreadPoolInterface 线程池,用于并行地完成闭环检测
mutex_ common::Mutex 保护公共资源的互斥量
when_done_ std::unique_ptr<std::function<void(const Result&)>> 通过接口WhenDone注册的回调函数对象
num_started_nodes_ int 一个计数器,记录了当前需要考虑的轨迹节点数量。
num_finished_nodes_ int 一个计数器,记录了当前已完成约束计算的轨迹节点数量。
finish_node_task_ std::unique_ptr<common::Task> 完成轨迹节点约束计算的任务状态机
when_done_task_ std::unique_ptr<common::Task> WhenDone的任务状态机
constraints_ std::deque<std::unique_ptr<Constraint>> 用于保存后台计算的约束的双端队列
submap_scan_matchers_ std::map<SubmapId, SubmapScanMatcher> 记录各个子图的扫描匹配器的容器,以SubmapId为索引。
sampler_ common::FixedRatioSampler 采样器
ceres_scan_matcher_ scan_matching::CeresScanMatcher2D 基于Ceres库的扫描匹配器
score_histogram_ common::Histogram 扫描匹配得分的直方图

下面的代码片段是类ConstraintBuilder2D的构造函数,它有两个输入参数,分别记录了配置项和线程池对象。 这个配置项options早在Cartographer的ROS入口中就已经从配置文件中加载了。 而线程池对象thread_pool则指向地图构建器对象map_builder的一个成员变量。 在构造函数成员构造列表中,直接将这两个对象赋予了成员变量options_和thread_pool_。并且完成了任务状态机对象finish_node_task_和when_done_task_的初始构造, 同时根据参数配置完成了采样器和基于Ceres的扫描匹配器的构造。

        ConstraintBuilder2D::ConstraintBuilder2D(const constraints::proto::ConstraintBuilderOptions& options,
                        common::ThreadPoolInterface* const thread_pool)
            : options_(options), thread_pool_(thread_pool),
              finish_node_task_(common::make_unique<common::Task>()), when_done_task_(common::make_unique<common::Task>()),
              sampler_(options.sampling_ratio()), ceres_scan_matcher_(options.ceres_scan_matcher_options()) {}

下面的代码片段则是类ConstraintBuilder2D的析构函数,该函数实际没有什么具体的操作,只检查了各种状态,保证释放对象的时候没有任务在后台运行。

        ConstraintBuilder2D::~ConstraintBuilder2D() {
            common::MutexLocker locker(&mutex_);
            CHECK_EQ(finish_node_task_->GetState(), common::Task::NEW);
            CHECK_EQ(when_done_task_->GetState(), common::Task::NEW);
            CHECK_EQ(constraints_.size(), 0) << "WhenDone() was not called";
            CHECK_EQ(num_started_nodes_, num_finished_nodes_);
            CHECK(when_done_ == nullptr);
        }

此外,类ConstraintBuilder2D屏蔽了拷贝构造函数和拷贝赋值操作符。

        ConstraintBuilder2D(const ConstraintBuilder2D&) = delete;
        ConstraintBuilder2D& operator=(const ConstraintBuilder2D&) = delete;

2. MaybeAdd-WhenDone调用循环

我们知道Cartographer在后台通过一个线程池并行的进行闭环检测,计算约束。 Global SLAM的核心PoseGraph2D通过MaybeAdd-WhenDone调用循环来组织后台的闭环检测。 所谓的MaybeAdd-WhenDone调用循环是指,在任意调用MaybeAddConstraint、MaybeAddGlobalConstraint、NotifyEndOfNode之后,调用一次WhenDone,如此循环往复。

接口MaybeAddConstraint用于检查子图和路径节点之间是否存在可能的约束,如下面的代码片段所示,它有5个输入参数,其中submap_id和node_id分别是子图和路径节点的索引。 指针submap和constant_data分别指向了考察的子图对象和路径节点中记录了激光点云数据,需要注意的是这两个对象的生命周期应当能够覆盖后端优化的计算过程。 initial_relative_pose记录了路径节点相对于子图的初始位姿,提供了优化迭代的一个初值。

        void MaybeAddConstraint(const SubmapId& submap_id, const Submap2D* submap,
                                const NodeId& node_id, const TrajectoryNode::Data* const constant_data,
                                const transform::Rigid2d& initial_relative_pose) {

如果初始的相对位姿显示,路径节点与子图相差很远,就直接返回不再两者之间建立约束。这个距离阈值可以通过配置项max_constraint_distance来设定。这样做有两个好处, 其一可以一定程度上降低约束的数量,减少全局图优化的计算量,提高系统的运行效率; 其二,如果两者相差太远,很可能会受到累积误差的影响,导致添加错误的约束,给最终的全局优化带来负面的影响, 这样直接抛弃一些不太合理的约束,看似遗漏了很多信息,但对于优化结果而言可能是一件好事。

            if (initial_relative_pose.translation().norm() > options_.max_constraint_distance())
                return;

如果采样器暂停了,也将直接退出。我目前还不是很清楚这个采样器是干什么用的。

            if (!sampler_.Pulse()) return;

接下来通过互斥量对象mutex_加锁。然后检查回调函数对象when_done_是否存在,若存在则说明上一个闭环检测迭代还没有完全结束,所以通过日志报警。

            common::MutexLocker locker(&mutex_);
            if (when_done_)
                LOG(WARNING) << "MaybeAddConstraint was called while WhenDone was scheduled.";

然后就向容器constraints_中添加新的约束。至于那个kQueueLengthMetric我们暂时先不去管它。

            constraints_.emplace_back();
            kQueueLengthMetric->Set(constraints_.size());
            auto* const constraint = &constraints_.back();

接着通过函数DispatchScanMatcherConstruction构建了一个扫描匹配器,一会儿我们就会看到,这个函数并没有完成扫描匹配器的构建,而是通过lambda表达式和线程池推后实现的。

            const auto* scan_matcher = DispatchScanMatcherConstruction(submap_id, submap->grid());

构建一个线程池所用的任务对象constraint_task,并通过接口SetWorkItem和lambda表达式具体描述工作内容。其lambda表达式中调用的函数ComputeConstraint具体完成了约束的计算。

            auto constraint_task = common::make_unique<common::Task>();
            constraint_task->SetWorkItem([=]() EXCLUDES(mutex_) {
                ComputeConstraint(submap_id, submap, node_id, false, /* match_full_submap */
                      constant_data, initial_relative_pose, *scan_matcher,
                      constraint);
            });

由于具体的约束计算过程需要用到刚刚构建的扫描匹配器对象scan_matcher,而该对象的核心也是在线程池中的一个Task完成构建的。 所以这里通过给Task添加依赖关系来保证,在进行约束计算之前扫描匹配器就已经完成构建和初始化了。

            constraint_task->AddDependency(scan_matcher->creation_task_handle);

最后将计算约束的任务添加到线程池的调度队列中,并将其设置为完成轨迹节点约束计算任务的依赖,保证在完成了所有计算约束的任务之后才会执行constraint_task的计算任务。

            auto constraint_task_handle = thread_pool_->Schedule(std::move(constraint_task));
            finish_node_task_->AddDependency(constraint_task_handle);
        }

接口MaybeAddGlobalConstraint的功能类似, 也是计算子图和路径节点之间是否存在可能的约束。所不同的是,该接口只有四个输入参数,没有提供初始相对位姿, 而且它的扫描匹配是在整个子图上进行的。该接口与MaybeAddConstraint的套路基本一样,下面的代码片段重点列出了不同之处:

        void MaybeAddGlobalConstraint(const SubmapId& submap_id, const Submap2D* submap,
                                const NodeId& node_id, const TrajectoryNode::Data* const constant_data) {
            /*  */
                ComputeConstraint(submap_id, submap, node_id, false, /* match_full_submap */
                      constant_data, initial_relative_pose, *scan_matcher,
                      constraint);
            /*  */
        }

回顾位姿图的更新过程,我们会发现闭环检测是在每次向后端添加路径节点的时候触发的。 除了要建立路径节点与新旧子图之间的内部约束之外,还要与所有处于kFinished状态的子图进行一次匹配计算可能存在的约束,如果旧子图切换到kFinished状态还需要将之与所有路径节点进行匹配。完成了这些操作之后, 它就会调用接口NotifyEndOfNode, 通知对象constraint_builder_完成了一个路径节点的插入工作,下面的代码描述了该接口的实现。

        void ConstraintBuilder2D::NotifyEndOfNode() {
            common::MutexLocker locker(&mutex_);
            CHECK(finish_node_task_ != nullptr);
            finish_node_task_->SetWorkItem([this] {
                common::MutexLocker locker(&mutex_);
                ++num_finished_nodes_;
            });
            auto finish_node_task_handle = thread_pool_->Schedule(std::move(finish_node_task_));
            finish_node_task_ = common::make_unique();
            when_done_task_->AddDependency(finish_node_task_handle);
            ++num_started_nodes_;
        }

根据第4到7行中的lambda表达式,可以看到当完成了轨迹节点的约束计算后,就会增加计数器num_finished_nodes_的计数,标识着一个新的路径节点被添加到后端。 在第8行中通过线程池的Schedule接口,将任务对象finish_node_task_添加到线程池的调度队列中,并在第10行中将该对象添加到WhenDone任务的依赖列表中。然后在第9行重新构建一个finish_node_task_对象。 最后控制计数器num_started_nodes_自加。

最后,我们再来看一下接口WhenDone。 如下面的代码所示,该函数只有一个输入参数,记录了当所有的闭环检测任务结束之后的回调函数。在函数的第4行中,用指针when_done_记录下回调函数。 然后在第6行中指定when_done_task_的具体工作内容,调用函数RunWhenDoneCallback。第7、8行中将when_done_task_添加到线程池的调度队列中,并新建一个任务状态机对象。

        void ConstraintBuilder2D::WhenDone(const std::function<void(const ConstraintBuilder2D::Result&)>& callback) {
            common::MutexLocker locker(&mutex_);
            CHECK(when_done_ == nullptr);
            when_done_ = common::make_unique<std::function<void(const Result&)>>(callback);
            CHECK(when_done_task_ != nullptr);
            when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
            thread_pool_->Schedule(std::move(when_done_task_));
            when_done_task_ = common::make_unique();
        }

3. 其他辅助函数

在刚刚介绍的MaybeAdd-WhenDone调用循环中,我们看到了一些辅助功能的函数。下面我们将详细介绍DispatchScanMatcherConstruction、ComputeConstraint、RunWhenDoneCallback三个函数。

函数DispatchScanMatcherConstruction用于为新增的子图创建一个扫描匹配器,成功创建的匹配器将以submap_id为索引被保存在容器submap_scan_matchers_中。下面是该函数的代码片段, 可以看到它有两个输入参数,其中submap_id记录了子图的索引,而grid则是子图的占用栅格表示。在函数的一开始,先通过容器submap_scan_matchers_查询一下是否曾经为该子图创建过扫描匹配器。 若是则直接返回所对应的扫描匹配器对象。

        const ConstraintBuilder2D::SubmapScanMatcher*
        ConstraintBuilder2D::DispatchScanMatcherConstruction(const SubmapId& submap_id, const Grid2D* const grid) {
            if (submap_scan_matchers_.count(submap_id) != 0)
                return &submap_scan_matchers_.at(submap_id);

然后扩展容器submap_scan_matchers_,并将输入的子图占用栅格填充到新扩展的扫描匹配器中。

            auto& submap_scan_matcher = submap_scan_matchers_[submap_id];
            submap_scan_matcher.grid = grid;

接着,从配置项中获取扫描匹配器的配置。同时创建一个临时的任务对象scan_matcher_task,在它的lambda表达式中,使用刚刚获取的配置和占用栅格创建了一个FastCorrelativeScanMatcher2D类型的对象。 该对象是Cartographer通过分支定界进行闭环检测的算法实现,我们将在下一篇文章中详细介绍。

            auto& scan_matcher_options = options_.fast_correlative_scan_matcher_options();
            auto scan_matcher_task = common::make_unique();
            scan_matcher_task->SetWorkItem([&submap_scan_matcher, &scan_matcher_options]() {
                submap_scan_matcher.fast_correlative_scan_matcher =
                    common::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(*submap_scan_matcher.grid, scan_matcher_options);
            });

最后,将构建扫描匹配器的任务放置到线程池的调度队列中,并将刚刚扩展得到的匹配器对象返回。

            submap_scan_matcher.creation_task_handle = thread_pool_->Schedule(std::move(scan_matcher_task));
            return &submap_scan_matchers_.at(submap_id);
        }

函数ComputeConstraint在后台线程中完成子图与路径节点之间可能的约束计算。如下面的代码片段所示,它有很多输入参数。 其中submap_id、submap、node_id、constant_data分别是待考察的子图和路径节点的索引和数据内容。布尔变量match_full_submap用于指示是否完成遍历子图。 initial_relative_pose描述了路径节点相对于子图的初始位姿,submap_scan_matcher则是进行扫描匹配时所用的匹配器对象,constraint则用于输出约束结果。

        void ConstraintBuilder2D::ComputeConstraint(const SubmapId& submap_id, const Submap2D* const submap,
                                                    const NodeId& node_id, bool match_full_submap,
                                                    const TrajectoryNode::Data* const constant_data,
                                                    const transform::Rigid2d& initial_relative_pose,
                                                    const SubmapScanMatcher& submap_scan_matcher,
                                                    std::unique_ptr<ConstraintBuilder2D::Constraint>* constraint) {

在函数的一开始,先构建几个临时变量。initial_pose用于描述在世界坐标系下路径节点与子图之间的相对位置关系。score和pose_estimate用于记录扫描匹配之后的得分和位姿估计。

            const transform::Rigid2d initial_pose = ComputeSubmapPose(*submap) * initial_relative_pose;
            float score = 0.;
            transform::Rigid2d pose_estimate = transform::Rigid2d::Identity();

然后根据输入参数match_full_submap选择不同的扫描匹配方式,如果没有匹配成功则直接退出。

            if (match_full_submap) {
                kGlobalConstraintsSearchedMetric->Increment();
                if (submap_scan_matcher.fast_correlative_scan_matcher->MatchFullSubmap(/* 省略参数 */, &score, &pose_estimate))
                    // 省略结果检查和日志相关代码
                else
                    return;
            } else {
                kConstraintsSearchedMetric->Increment();
                if (submap_scan_matcher.fast_correlative_scan_matcher->Match(/* 省略参数 */, &score, &pose_estimate))
                    // 省略结果检查和日志相关代码
                else
                    return;
            }

接着在一个局部的locker的保护下,将新获得的约束得分统计到一个直方图中。

            {
                common::MutexLocker locker(&mutex_);
                score_histogram_.Add(score);
            }

最后使用Ceres扫描匹配进一步的对刚刚构建的约束进行优化,并将这种新建的约束标记为INTER_SUBMAP类型。

            ceres::Solver::Summary unused_summary;
            ceres_scan_matcher_.Match(/* 省略参数 */, &pose_estimate, &unused_summary);
            const transform::Rigid2d constraint_transform = ComputeSubmapPose(*submap).inverse() * pose_estimate;
            constraint->reset(new Constraint{submap_id, node_id,
                                             {transform::Embed3D(constraint_transform),
                                              options_.loop_closure_translation_weight(),
                                              options_.loop_closure_rotation_weight()},
                                              Constraint::INTER_SUBMAP});
            // 省略输出日志相关的语句
        }

函数RunWhenDoneCallback是当一轮MaybeAdd-WhenDone任务结束后,用来调用WhenDone接口注册的回调函数的。下面是该函数的代码实现, 其工作内容无非是将MaybeAdd过程中得到的约束放到result对象中,然后清空约束列表constraints_,用临时变量callback记录下回调函数对象,并释放when_done_指针。最后将result作为参数传递给回调函数。

        void ConstraintBuilder2D::RunWhenDoneCallback() {
            Result result;
            std::unique_ptr<std::function<void(const Result&)>> callback;
            {
                common::MutexLocker locker(&mutex_);
                CHECK(when_done_ != nullptr);
                for (const std::unique_ptr& constraint : constraints_) {
                    if (constraint == nullptr) continue;
                    result.push_back(*constraint);
                }
                // 省略日志
                constraints_.clear();
                callback = std::move(when_done_);
                when_done_.reset();
                kQueueLengthMetric->Set(constraints_.size());
            }
            (*callback)(result);
        }

4. 完

本文中,我们详细分析了约束构建器的成员变量,以及MaybeAdd-WhenDone调用循环和它们的接口实现。看到它借助线程池在后台并行地完成了闭环检测和INTER_SUMMAP类型的约束计算。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1