首页 关于
树枝想去撕裂天空 / 却只戳了几个微小的窟窿 / 它透出天外的光亮 / 人们把它叫做月亮和星星
目录

进程队列

我们已经实现了一个具有上下文切换的任务框架, 在这个框架下我们两个任务的相互切换控制三色灯在红色和绿色之间闪烁。 实际上,这个框架在任务调度方面并没有做太多的工作,其任务调度函数如下所示:

        void task_switch() {
            if (gp_xtos_cur_task == &taskA)
                gp_xtos_next_task = &taskB;
            else
                gp_xtos_next_task = &taskA;

            xtos_context_switch();
        }

在这个函数中,我们先判定当前正在运行的任务,然后修改指针gp_xtos_next_task,最后触发上下文切换。 对于控制LED这样简单的任务,这个调度函数足够胜任。但在任务繁多、复杂的情况下,这样的切换就显得不适合了。 因为这样我们就必须明确知道有多少的任务需要调度,然后再修改调度函数为每个任务添加一条调度规则,调度函数也会显得非常臃肿。

之所以存在这样的问题,是因为我们没有一个数据结构记录需要运行的任务。假如我们有一个数组记录了所有的任务, 就可以通过遍历这个数组来依次执行各个任务。更近一步的,我们可以定义一个任务描述符的队列, 在每次任务调度时先把当前正在运行的任务的描述符添加到队列的末尾,再从队首取出一个描述符作为下一次上下文切换的对象。 那么这个队列就是所谓的进程队列

进程队列可以为我们带来很大的灵活性。任务调度时只需要关心进程队列中是否有进程,至于有多少进程,是什么进程就不用考虑了。 这样我们就可以通过向进程队列中添加新的描述符来添加进程,当一个进程执行完它的任务后,将其描述符从进程队列中删除,以后将不再执行该进程了。 我们不仅可以像在框架中那样,在系统初始化的时候就把进程创建好, 也可以等到系统运行过程中动态的添加和删除进程了。

1. 队列的实现

队列就是一种先进先出的线性表,其实现形式可以是顺序存储的,也可以是链式存储的。这里在Linux内核中双向链表的基础上实现进程队列。

1.1 双向链表

我所见到的几乎所有关于数据结构的书中,链表都是用类似如下的形式实现的,结构体整个作为一个链表:

        struct ListNode {
            DataType_1 data1;
            DataType_2 data2;
            ...
            struct ListNode *prev;
            struct ListNode *next;
        }
        void list_add(struct ListNode *new, struct ListNode *head) {
            head->next->prev = new;
            new->next = head->next;
            new->prev = head;
            head->next = new;
        }
结构体ListNode的定义中DataType_1和DataType_2分别是两种数据类型,prev和next分别是指向前驱和后继的两个指针。 上述代码还给出了一个插入新元素的例子,可以看到实际上对链表的操作只不过是在修改那两个指针而已,与链表中其它字段一点关系都没有。

这种实现方式很直观,但有一个缺点——换一个数据类型我们就不得不重新实现一些链表的插入删除等操作,因而会写出来很多功能类似的代码。 既然链表是一种抽象的数据结构,就不应该与具体的数据类型绑定,而应当像容器一样什么数据类型都可以装进去。

早期Linux,使用的就是上面所说的实现方式,在内核代码中有各种各样的链表。 到了2.1版[·],官方出台了一个强大且通用的链表实现, 使得链表的实现与实际的数据对象没有关联。它不再把结构体整个作为一个链表,而是结构体中包涵了一个链表。就像下面左边SomeData结构体的定义中, 有一个struct list_head的成员list。这个struct list_head就是Linux中的通用链表,其定义如下面右边的代码所示。

        struct SomeData {
            DataType_1 data;
            DataType_2 data;
            ...
            struct list_head list;
        }

        struct list_head {
            struct list_head *prev;
            struct list_head *next;
        }
这样做我们对于链表的操作就只是围绕着struct list_head结构体进行的,不需要为每一种数据类型专门实现一套链表操作了。 此时就有一个问题,当我们拿到一个struct list_head类型的对象,如何确定它是哪个数据的成员呢? Linux内核使用了宏定义container_of()解决了这个问题。通过宏container_of(),我们可以根据某个list_head对象找到包含它的结构体对象的起始地址。 在不同的编译环境下,这个宏的实现略有不同,但其原理都是一样的。下面我们介绍一下在Keil环境下,针对STM32的实现方式。

1.2 宏定义contrainer_of

宏container_of()的定义如下,其中ptr为某一成员变量的指针,type为包含该变量的结构体,member为成员名称。这个宏将根据成员变量的指针, 求出包含该成员的结构体对象的地址,并以指针的形式解释之。

        #define container_of(ptr, type, member) ((type *)((uint8 *)(ptr) - offsetof(type, member)))
假设我们有一个struct SomeData的对象a,和一个指向其成员list的指针p,那么我们可以像这样获取a的地址,其中pa为一个指向a的指针。
        struct SomeData *pa = container_of(p, struct SomeData, list);
在container_of()中,还有一个宏定义offsetof(),用于求取成员在结构体中的偏移地址,也就是成员是结构体中的第几个字节。 用成员变量的地址减去其在结构体中的偏移地址,得到的就是结构体数据的地址。但在做减法之前,我们需要对ptr进行强制类型转换, 将其解释成一个uint8类型的指针,这样对ptr的加减运算就是uint8的类型长度为单位进行的了。uint8是我们通过typedef关键字定义的类型, 它实际上是一个无符号的char类型,长度就是一个字节。所以强制类型转换后的成员指针ptr减去成员的偏移地址, 得到的就是type类型的结构体对象的起始地址,最后通过强制类型转换得到指向结构体的指针。

可能有读者会有这么一个疑问,本身ptr就是一个指针已经记录了成员变量在内存中的地址了,为什么一定要将ptr转换成为一个uint8的指针? 在C语言中,对于指针的加减操作是与类型相关的。对于一个int类型的指针p而言,因为int类型的数据长度是4个字节, 所以对p进行加1的操作,p的值实际增加了4。也就是说,对于指针的加减操作,指针值的增减是以其类型的数据长度为单位的, 对于char型指针就是1,int型就是4,double型就是8…… 在container_of()中我们除了知道结构体的类型为type和成员变量的名称之外,不再有更多的类型相关的信息,只能以字节为单位进行计算, 所以必须将ptr强制转换为uint8类型。

现在我们再来解释宏offsetof()是如何获得成员在结构体中的偏移地址的。它的定义如下,其中type是结构体类型,member是成员名称。

        #define offsetof(type, member) ((unsigned long) &((type *)0)->member)
让我们先看"(type *)0"这一部分,它的意思是把数值0解释称为一个type类型的指针。 我们对0地址进行强制类型转换使其成为一个类型为type的指针。有人可能会觉得很奇怪,0分明就是一个数,连个变量都不是, 如何能够对其进行强制类型转换?实际上,我们可以将之看做是对一个数值为0的常量进行的类型转换:
        const int tmp = 0;
        (type *)tmp
现在,我们得到了一个指向0地址的指针,通过'->'运算符可以访问0地址下type类型对象的成员member。 有人会说0地址的指针不能随便用,它会导致系统崩溃。实际上只要我们不去修改0地址下的数据内容,系统是不会崩溃的。 访问到成员member之后用'&'对其取址,因为type类型的起始地址就是0,所以此时得到的成员地址就是其在结构体中的偏移地址。 对这个偏移地址进行强制类型转换为一个无符号的整型,我们就知道成员member在type类型中的第几个字节上了。

借助container_of(),Linux内核提供了一个封装用于根据链表项来求取包含它的结构体对象,如下:

        /*
         * list_entry - get the struct for this entry
         * @ptr: the &struct list_head pointer.
         * @type: the type of the struct this is embedded in.
         * @member: the name of the list_struct within the struct.
         */
        #define list_entry(ptr, type, member) \
            container_of(ptr, type, member)

1.3 基于双向循环链表的队列

为了保证队列的先入先出的特性,在添加新元素时我们需要将之加到队尾,从队首取出元素,因此我们还需要两个指针分别指向队首和队尾。 让我们再回过头来看list_head的定义,就会发现它的成员就只有两个指针。 完全可以定义一个list_head的对象head让其中一个指向队首,另一个指向队尾。 我们还知道队首是没有前驱的,队尾是没有后继的,所以队首的prev和队尾的next成员通常指向一个list_head对象标志着它们是队首或者队尾。 我们也可以拿head作为这个特定的值。这样就构成了一个双向循环链表,如下图1所示。 实际上,在Linux中list_head就是以双向循环链表的形式实现的。

图1 双向循环链表示意图

从图1中,可以看出我们还需要一个特殊的list_head对象,用于标记整个链表,暂时称之为链表头。 链表头与链表中其它对象不同的是,它不需要塞在一个结构体里, 它的next指针指向链表中的第一个塞在结构体中的list_head对象,prev则指向链表中最后一个塞在结构体中的list_head对象, 从而构成了一个循环链表,而它就是我们所需要的队列结构。也就是说链表头的前驱就是我们的队尾,后继就是队首

Linux中提供了函数list_add_tail()用于添加新的链表项作为指定链表项的前驱。使用该函数我们就可以在队尾中添加新元素了。

        /*
         * list_add_tail - add a new entry
         * @new: new entry to be added
         * @head: list head to add it before
         *
         * Insert a new entry before the specified head.
         * This is useful for implementing queues.
         */
        void list_add_tail(struct list_head* new, struct list_head* head) {
            __list_add(new, head->prev, head);
        }

通过宏定义list_first_entry(),我们可以获取指定链表项后继的宿主结构体对象,也就是我们所关心的队首。 需要注意的是使用该宏时应当保证队列不是空的。出队操作,除了获取队首元素外,还应将队首所包含的list_head对象从链表中删除。 Linux也提供了删除链表项的函数list_del()可供我们使用,详细的函数实现不在这里列出了。

        /**
         * list_first_entry - get the first element from a list
         * @ptr:   the list head to take the element from.
         * @type:  the type of the struct this is embedded in.
         * @member:    the name of the list_struct within the struct.
         *
         * Note, that list is expected to be not empty.
         */
        #define list_first_entry(ptr, type, member) \
            list_entry((ptr)->next, type, member)

通过函数list_empty(),我们可以判断一个链表是否为空。它只是简单的判定其后继是否是自己,如果是自己则说明链表中除了链表头之外, 不存在其它链表项,所以为空。这也就要求我们调用函数init_list_head()对链表头进行初始化,使其前驱和后继都指向自己。

        int list_empty(const struct list_head *head) {
            return head->next == head;
        }

        void init_list_head(struct list_head* list) {
            list->prev = list;
            list->next = list;
        }

这样借助linux内核链表提供的函数和宏定义,我们就可以实现一个基于双向循环链表的队列了。下面我们将用这种形式实现一个进程队列。

2. 进程队列

为了实现进程队列,首先我们定义了对象struct list_head L0_tasks,用于标记进程队列。同时定义了一个初始化操作系统的函数xtos_init(), 通过函数init_list_head()将之标记为空。以后随着我们操作系统逐渐变得复杂,这个函数也会相应的做出更多的初始化工作。

    void xtos_init() {
        init_list_head(&L0_tasks);
    }

然后,我们修改任务描述符,在其中嵌入一个struct list_head的成员。此外,我们还对其做出了如下的修改:

  1. 把结构体名称由xtos_task_struct改为xtos_task_descriptor,并定义了类型名xtos_task_desp_t。这样做没有什么别的意义,就是觉得原来的名字不舒服。
  2. 添加了字段pid用于记录进程ID。为了管理起来方便,我们应该为每个进程分配一个唯一的标识符,虽然现在我们还不会用到它。
  3. 添加了指针pBottomOfStack指示进程栈空间的栈底。这里主要是为了防止函数嵌套调用以及局部变量过多,造成栈空间溢出进而导致系统崩溃。目前也没有用到它。
        typedef struct xtos_task_descriptor {
            uint32 *pTopOfStack;        /* 栈顶地址,该位段不可以更改 */
            uint32 *pBottomOfStack;     /* 栈底地址 */
            uint16 pid;                 /* 进程ID */
            struct list_head list;      /* 链表对象 */
        } xtos_task_desp_t;

再然后,我用static关键字修饰了原先用于创建进程的函数xtos_create_task(),使其不再暴露给用户程序了。 另外添加了函数xtos_init_task_descriptor()用于初始化进程描述符,实现代码如下,其中tcb为目标进程描述符, task是进程任务入口函数,stk_bottom则是进程栈空间的栈底地址,pid为进程ID用于唯一标识一个进程。 xtos_create_task()函数仍然用于初始化用户进程的栈空间等资源分配任务,只是需要返回进程的栈顶地址。

        void xtos_init_task_descriptor(struct xtos_task_descriptor *tcb, xtos_task task, uint32 *stk_bottom, uint16 pid) {
            tcb->pBottomOfStack = stk_bottom;
            tcb->pTopOfStack = xtos_create_task(tcb, task, stk_bottom);
            tcb->pid = pid;

            init_list_head(&tcb->list);
            list_add_tail(&tcb->list, &L0_tasks);
        }

在函数xtos_init_task_descriptor()中,除了向pBottomOfStack, pid等成员变量赋初值之外,还调用了xtos_create_task()函数为新创建的进程分配资源。 并先调用init_list_head()函数对进程描述符中的链表对象list进行初始化。再调用list_add_tail()函数,把新进程描述符添加到进程队列的队尾。

3. 进程调度

重写系统调度函数为xtos_schedule()。在函数开始和结尾使用"__asm"成对地插入了两句汇编,分别用于关中断和开中断。 因为系统逐渐复杂之后,触发进程调度的函数可能不止一处,存在多个进程或者中断服务函数同时触发进程调度的情况。 这种情况实际上是一种竞争的状态,对于进程队列以及执行进程指针做出不合理的修改,导致系统不能正常运行。 为了解除竞争,这里就关闭中断进入临界区,保证该函数中的语句能够按顺序依次执行完毕,即一些参考书中所谓的原子性

        void xtos_schedule(void) {
            __asm("CPSID    I");
            list_add_tail(&gp_xtos_next_task->list, &L0_tasks);
            gp_xtos_next_task = list_first_entry(&L0_tasks, struct xtos_task_descriptor, list);
            list_del(&gp_xtos_next_task->list);
            xtos_context_switch();
            __asm("CPSIE   I");
        }

然后调用list_add_tail()把当前正在执行的进程添加到进程队列的队尾,并从队首通过宏list_first_entry()取出一个进程描述符的地址, 赋予进程指针gp_xtos_next_task,作为下次上下文切换时的目标进程。最后调用list_del()把目标进程从进程队列中删除, 并调用xtos_context_switch()触发上下文切换。

需要说明的是,第3行对函数list_add_tail()传参使用的是指针gp_xtos_next_task并没有错误,而是特意为之的。 最初我确实使用的是gp_xtos_cur_task,但后来发现系统运行了一段时间后,进程队列中就只有一个进程了。

造成这种现象的原因是,调用xtos_schedule()的时候并不是立即进行上下文切换的。 我们在介绍上下文切换的原理时提到, 在调度函数中执行的xtos_context_switch()只是触发了PendSV中断,真正的上下文切换是在PendSV的中断服务函数中做的。 而PendSV中断的优先级是最低的,只有在当前没有中断需要处理时才会进入它的中断服务函数。 那么当一个用户进程已经触发进行了一次系统调度,就会修改进程指针gp_xtos_next_task的值, 只有等到处理器响应PendSV的中断服务时,才可能将该值赋给gp_xtos_cur_task。如果在进入PendSV的中断服务函数之前,又调用了一次xtos_schedule函数, gp_xtos_next_task就会被赋予一个新值,而此时gp_xtos_cur_task的值还没有被修改过。 这样gp_xtos_next_task中的旧值(也就是上次调度时的目标进程)将被丢弃,重复几次这样的操作,进程队列中就会只剩下一个进程描述符。

此外,我们还添加了一个函数xtos_start(),用于启动操作系统。它做的工作与调度函数类似,只是第一次触发上下文切换,不需要保存当前进程到队尾而已。

        void xtos_start(void) {
            __asm("CPSID    I");
            gp_xtos_next_task = list_first_entry(&L0_tasks, struct xtos_task_descriptor, list);
            list_del(&gp_xtos_next_task->list);

            xtos_state = XTOS_STATED;
            xtos_first_switch();
            __asm("CPSIE   I");
        }

在main函数中通过如下的语句,先后完成操作系统的初始化、创建两个进程、开启操作系统。

        xtos_init();
        xtos_init_task_descriptor(&taskA, taska, &taskA_Stk[TASKA_STK_SIZE - 1], 0);
        xtos_init_task_descriptor(&taskB, taskb, &taskB_Stk[TASKB_STK_SIZE - 1], 1);
        xtos_start();
在用户进程taska和taskb中,把原来调用的task_switch(),更改为xtos_schedule()。 编译运行后可以得到和具有上下文切换的任务框架一样的现象, 代码可以在这里下载。

4. 总结

在本文中,我们使用Linux的内核链表实现了一个进程队列。需要添加新进程时,我们只需要为该进程分配好栈空间等资源后,添加进程到队列中, 以后就可以对其进行调度了。需要删除进程时,只需要把目标进程从队列中删除,以后就不会再调度该进程了,必要的话我们还可以释放其占用的资源。 总之,进程队列为我们提供了一个方便管理的结构。




Copyright @ 高乙超. All Rights Reserved. 京ICP备16033081号-1