AbydOS开发日记 (6) - 多核初步

3/22/2024 OSC++

# 多核与多线程

一个 CPU 上可能不止一个核心 (Hart),这些核心在物理上分开,独立运行,但是共享总线和外设。也就是说,它们的物理内存空间是一致的,只不过有自己单独的寄存器和 Cache、MMU。而多线程非常类似,只不过是通过调度的方式实现,将多个线程的寄存器等资源分时复用。在裸机的角度上看,我们可以把多核看成硬件的多线程,并利用类似的技术实现。

# 核心的启动

在 RISCV 手册中,并没有规定 Hart 的启动顺序。比较通常的做法是,核心全部(近乎)同时从 M-mode 启动,随后 SBI 会通过原子量确定一个启动核心,并且执行初始化。至于 S-mode 中是否同时启动,我并没有明确看到文档指出,所以应该也是 SBI 实现自定义。通过实验,OpenSBI 只会将启动核心带入到 S-mode,其他核心会留在 M-mode 等待。但是,我们仍然有必要通过一个原子量保证 S-mode 初期只有一个核心运行,具体看,就是下面的汇编代码:

_start:
	/* Pick one hart to run the main boot sequence */
	lla	a3, _hart_lottery
	li	a2, 1
	amoadd.w a3, a2, (a3)
	bnez	a3, _start_hang // in case multi core start at the same time
1
2
3
4
5
6

这些代码在入口点立即执行,如果有多个核心,没有抢到控制权的就会跳到 _start_hang 的死循环里。当然,为了兼容性,后面肯定会设置一下中断再睡觉,不然在其他 SBI 上可能没法用。

那么我们如何通过 SBI 启动核心呢?答案是 SBI 的 HSM 扩展,即 Hart State Management。该扩展提供 4 个 ecall 调用,分别对应核心的 启动、(自)停止、状态查询和暂停。如下:

struct sbiret sbi_hart_start(unsigned long hartid,
  unsigned long start_addr,
  unsigned long opaque);

struct sbiret sbi_hart_stop(void);

struct sbiret sbi_hart_get_status(unsigned long hartid);

struct sbiret sbi_hart_suspend(uint32_t suspend_type,
  unsigned long resume_addr,
  unsigned long opaque);
1
2
3
4
5
6
7
8
9
10
11

文档说明,可以通过 sbi_hart_start 从指定的 start_addr 启动核心,并且其 a0 = hartida1 = opaque

# 核心的初始化

基于之前的布局,确定启动其他核心的时机是设备外设驱动安装完成后。此时内存映射和系统栈已经确定,可以确定下一步的所有数据。启动后,执行环境非常原始,需要一点汇编来做初始化工作。

启动流程

  1. 汇编初始化C环境
  2. k_early_boot() 初始化 C++ 环境
  3. k_boot() 探测并安装系统设备,开启 MMU,从系统堆分配并映射系统栈区
  4. 返回汇编,设置系统栈,进入 k_boot_perip()
  5. k_boot_perip() 探测并安装外设,初始化系统输入输出等
  6. 返回汇编,设置启动核心栈,带入 k_boot_harts()
  7. k_boot_harts() 调 SBI 启动其他核心
  8. 被启动核心初始化 MMU 和栈等,再进入 C++ 初始化执行环境,等待系统状态切换
  9. 全部启动完成,进入多核模式

# 内存映射

核心启动后,其 SATP = 0 和 sstatus.SIE = 0,即 MMU 和中断是关闭的。我们的系统栈是布置在虚拟地址空间的,所以要先启用 MMU。在之前的启动步骤,我们保存了系统 MMU 配置寄存器 (SATP) 的值,我们将在所有核心上使用同样的映射。所以内存映射非常简单,只需将保存的值写入 satp 后,刷新 TLB 即可:

_start_hart:
    /* a0: hartid a1: reserved */
	lla   a2, _sys_satp
	REG_L a2, 0(a2)
	csrw  CSR_SATP, a2
	sfence.vma zero, zero
1
2
3
4
5
6

# 核心系统栈的设定

在前面的步骤,我们确定了系统栈的空间,这里要根据 hart id,通过简单的乘法得到核心系统栈基址。如下:

	/* Setup system stack base according to hart_id as offset */
	lla a3, _sys_stack_base
	REG_L sp, 0(a3)
	li  a2, K_CONFIG_STACK_SIZE
	mul a3, a0, a2
	sub sp, sp, a3
1
2
3
4
5
6

理论上,这里开始就可以进入 C++ 了,但是不要忘记一件事,就是 libc 的上下文还没解决。

# 多核下 Newlib 的上下文

# 可重入函数

在 newlib 中,几乎所有函数都提供了两个版本,即不可重入的普通版本(ANSI),以及可重入的版本。不可重入的版本,其最终还是调用了可重入版本的函数,不过传入了一个全局重入结构。例如,printf:

不可重入版:

int	printf (const char *__restrict, ...)
               _ATTRIBUTE ((__format__ (__printf__, 1, 2)));
1
2

可重入版:

int	_printf_r (struct _reent *, const char *__restrict, ...)
               _ATTRIBUTE ((__format__ (__printf__, 2, 3)));
1
2

可以注意到,可重入版本多了一个 _reent 结构体,其在 <sys/reent.h> 定义,一并定义了全局的:

/*
 * All references to struct _reent are via this pointer.
 * Internally, newlib routines that need to reference it should use _REENT.
 */

#ifndef __ATTRIBUTE_IMPURE_PTR__
#define __ATTRIBUTE_IMPURE_PTR__
#endif

extern struct _reent *_impure_ptr __ATTRIBUTE_IMPURE_PTR__;

#ifndef __ATTRIBUTE_IMPURE_DATA__
#define __ATTRIBUTE_IMPURE_DATA__
#endif

extern struct _reent _impure_data __ATTRIBUTE_IMPURE_DATA__;

/* #define _REENT_ONLY define this to get only reentrant routines */

#if defined(__DYNAMIC_REENT__) && !defined(__SINGLE_THREAD__)
#ifndef __getreent
  struct _reent * __getreent (void);
#endif
# define _REENT (__getreent())
#else /* __SINGLE_THREAD__ || !__DYNAMIC_REENT__ */
# define _REENT _impure_ptr
#endif /* __SINGLE_THREAD__ || !__DYNAMIC_REENT__ */

#define _REENT_IS_NULL(_ptr) ((_ptr) == NULL)

#define _GLOBAL_REENT (&_impure_data)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

在 newlib 内部,不可重入函数通过 _REENT 宏获取全局重入对象指针后传入可重入版本,完成后续操作。

# 提供核心本地的 _reent

从上面的定义可以看出,我们如果以 __DYNAMIC_REENT__ 定义的情况编译 newlib,那么就可以通过 __getreent() 函数,提供核心本地的 _reent 结构。稍微修改编译脚本,重新编译工具链(参见此处 (opens new window)),然后添加一个实现:

    struct _reent *__getreent(void)
    {
        if (k_stage == K_MULTICORE)
            return &hl_reent;
        return _GLOBAL_REENT;
    }
1
2
3
4
5
6

这里 hl_reent 是线程本地 (Hart Local) 的 _reent

读到这里,很容易想到一个问题,那就是我的 hl_reent 如何存储呢?

# 核心本地存储

核心本地存储,就是每个核心所单独享有的存储空间。在不同核心上运行的系统程序,应该具有同样的需求,(代码都一样啊),即我们需要通过一些手段来区分核心并从不同的地方提供存储。

# 原始方法

一个简单而容易想到的方法是,不使用统一的 MMU 映射,而是通过差别映射,将核心系统栈固定在一个地址,然后在核心栈以上预留一段空间,作为核心本地存储。具体实现,可以参考此提交时的仓库 (opens new window)

获取地址的函数示例:

void *k_getHartLocal()
{
    extern uintptr_t _sys_stack_base;
    extern char _KERNEL_HART_LOCAL_DATA_SIZE;
    return (void *)(_sys_stack_base - (uintptr_t)&_KERNEL_HART_LOCAL_DATA_SIZE);
}
1
2
3
4
5
6

# 改进方案:利用 TLS

前面说过,多核可以看成硬件实现的多线程,那就可以利用一下多线程的技术。这里的 TLS 不是 计网的 TLS,而是 Thread Local Storage。其具体定义可以参考 ELF Handling For Thread-Local Storage (opens new window)

简单来说,通过 C++ 的 thread_local 关键字,或者 C 中的 _Thread_local 定义的变量,会被编译器放入特别的段:.tbss.tdata。这些段并不是直接拿来用的,而是作为一个副本,由实现自行加载内存区域并初始化。那么,如何寻址呢?在 RISCV 上,静态链接的情况下,是通过 tp 寄存器。该寄存器持有一个基址,即 TLS 本地存储的最低地址。我们要做的,首先是创建这两个段,拿到其原始数据的地址:

    .tdata :
	{

		PROVIDE(_tdata_start = .);

		*(.tdata)
		*(.tdata.*)
		. = ALIGN(8);
		PROVIDE(_tdata_end = .);
	} 

	/* In TLS, do not add align 4K, otherwise addressing to tbss will need to add a 4k from $tp which cost much space */

	.tbss : {
		PROVIDE(_tbss_start = .);
		*(.tbss)
		*(.tbss.*)
		. = ALIGN(8);
		PROVIDE(_tbss_end = .);
		
	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

要注意的是,由于依赖 tp 寻址,所以两个段中间的空闲区域也会算进去,应该不加对齐,否则会浪费内存,搞不好就非法访问了。(我才不会告诉你我调了一上午才发现 qwq)。

然后,在核心初始化时基于核心栈底,保留一段空间作为 TLS ,设置好 tp 寄存器通过汇编实现:

	/* Reserve Exception stack */
	li a3, K_CONFIG_EXCEPTION_BACKUP_SIZE
	csrw CSR_SSCRATCH, sp
	sub sp, sp, a3

_process_tbss:
	/* Setup tbss */
	lla a4, _tbss_start
	lla a5, _tbss_end
	beq a4, a5, _process_tdata

	sub a2, a5, a4
	sub sp, sp, a2
	mv  tp, sp
	mv	a3, tp

	/* Zero-out tbss */
_tbss_zero:
	REG_S	zero, (a3)
	add a3, a3, __SIZEOF_POINTER__
	add	a4, a4, __SIZEOF_POINTER__
	blt	a4, a5, _tbss_zero

_process_tdata:
	/* Setup tdata */
	lla a3, _tdata_start
	lla a2, _tdata_end
	beq a3, a2, _fini_setup_hart

	sub a2, a2, a3
	sub sp, sp, a2
	mv tp, sp

	/* Copy tdata */
	lla a4, _tdata_end
	mv a5, tp
_tdata_copy:
	REG_L a2, (a3)
	REG_S a2, (a5)
	add a3, a3, __SIZEOF_POINTER__
	add a5, a5, __SIZEOF_POINTER__
	blt a3, a4, _tdata_copy
_fini_setup_hart:
	ret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

这里顺便预留一下异常处理空间。处理完成后,整个布局如下:

核心存储布局

# TLS 支持的 newlib

NewLib 本身可以使用 TLS 来替代 _reent,但是我目前没有采用这种方案,原因是,

  1. 在启动早期的单核阶段,TLS 需要额外的空间分配和复制,比较难以实现
  2. 在系统退出多核模式后,还要重新初始化 TLS,避免死锁和冲突

# 多核同步

# 进入多核模式

由于核心启动需要时间,而我们不希望在所有核心都启动之前就开跑,就需要使用一个变量构造锁。

    enum k_stage_t
    {
        K_BEFORE_BOOT = 0,
        K_BOOT = 1,
        K_BOOT_PERIP = 2,
        K_BOOT_HARTS = 3,
        K_MULTICORE = 4,
        K_CLEARUP = 5
    };

    extern k_stage_t k_stage;
1
2
3
4
5
6
7
8
9
10
11

然后在核心启动到 k_premain() 中增加:

    while (k_stage != K_MULTICORE)
        ;          // wait for the boot core to finish
1
2

这将导致所有后启动的核心等待启动核心完成。

# 退出多核模式

退出多核模式之前,需要保证后启动核心正常停止,然后由启动核心清理现场。通过 k_after_main() 函数实现:

int k_after_main(int hartid, int main_ret)
{
    if (hartid < 0) // Non boot hart
    {

        printf("Hart %i has returned with %d\n", ::hartid, main_ret);
        k_hart_state[::hartid] = 3;
        // printf("Failed to stop hart: %ld\n", SBIF::HSM::stopHart());
    }
    else
    {
        printf("\n> Waiting for other harts to return...\n");
        int flag = 0;
        do // a timeout can be added here
        {
            flag = 0;
            for (int i = 0; i < 8; ++i)
            {
                if (i == hartid)
                    continue;
                if (k_hart_state[i] == 1 || k_hart_state[i] == 2)
                {
                    flag = 1;
                    break;
                }
                if (k_hart_state[i] == 3 || k_hart_state[i] == 0)
                    continue;
            }
        } while (flag);
        k_stdout_switched = false;
        k_stage = K_CLEARUP;
    }
    return main_ret; // pass to the lower
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# newlib 的锁

虽然我们使用 TLS 提供了核心专用的 _reent,但是 IO 方面仍然不支持多核,需要加锁。这里要利用 newlib 的 retargetable locking,提供自己的实现。首先看 <sys/lock.h> 定义:

#if !defined(_RETARGETABLE_LOCKING)

typedef int _LOCK_T;
typedef int _LOCK_RECURSIVE_T;

#define __LOCK_INIT(class,lock) static int lock = 0;
#define __LOCK_INIT_RECURSIVE(class,lock) static int lock = 0;
#define __lock_init(lock) ((void) 0)
#define __lock_init_recursive(lock) ((void) 0)
#define __lock_close(lock) ((void) 0)
#define __lock_close_recursive(lock) ((void) 0)
#define __lock_acquire(lock) ((void) 0)
#define __lock_acquire_recursive(lock) ((void) 0)
#define __lock_try_acquire(lock) ((void) 0)
#define __lock_try_acquire_recursive(lock) ((void) 0)
#define __lock_release(lock) ((void) 0)
#define __lock_release_recursive(lock) ((void) 0)

#else

#ifdef __cplusplus
extern "C" {
#endif

struct __lock;
typedef struct __lock * _LOCK_T;
#define _LOCK_RECURSIVE_T _LOCK_T

#define __LOCK_INIT(class,lock) extern struct __lock __lock_ ## lock; \
	class _LOCK_T lock = &__lock_ ## lock
#define __LOCK_INIT_RECURSIVE(class,lock) __LOCK_INIT(class,lock)

extern void __retarget_lock_init(_LOCK_T *lock);
#define __lock_init(lock) __retarget_lock_init(&lock)
extern void __retarget_lock_init_recursive(_LOCK_T *lock);
#define __lock_init_recursive(lock) __retarget_lock_init_recursive(&lock)
extern void __retarget_lock_close(_LOCK_T lock);
#define __lock_close(lock) __retarget_lock_close(lock)
extern void __retarget_lock_close_recursive(_LOCK_T lock);
#define __lock_close_recursive(lock) __retarget_lock_close_recursive(lock)
extern void __retarget_lock_acquire(_LOCK_T lock);
#define __lock_acquire(lock) __retarget_lock_acquire(lock)
extern void __retarget_lock_acquire_recursive(_LOCK_T lock);
#define __lock_acquire_recursive(lock) __retarget_lock_acquire_recursive(lock)
extern int __retarget_lock_try_acquire(_LOCK_T lock);
#define __lock_try_acquire(lock) __retarget_lock_try_acquire(lock)
extern int __retarget_lock_try_acquire_recursive(_LOCK_T lock);
#define __lock_try_acquire_recursive(lock) \
  __retarget_lock_try_acquire_recursive(lock)
extern void __retarget_lock_release(_LOCK_T lock);
#define __lock_release(lock) __retarget_lock_release(lock)
extern void __retarget_lock_release_recursive(_LOCK_T lock);
#define __lock_release_recursive(lock) __retarget_lock_release_recursive(lock)

#ifdef __cplusplus
}
#endif

#endif /* !defined(_RETARGETABLE_LOCKING) */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

好了,你已经猜到要干嘛了,重新编译工具链,打开这个选项。

之后,利用 std:atomic 造一个锁结构:

    struct __lock
    {
        std::atomic<int> owner = -1;
        std::atomic<int> recursive_count = 0;

        void lock(bool recursive = false)
        {

            if (k_stage != K_MULTICORE)
                return;
            // _write(0, (char*)"PLOCK\n",6);
            if (recursive)
            {
                if (owner == hartid)
                {
                    recursive_count++;
                    return;
                }
            }
            int null_owner = -1; // helper for compare_exchange_weak
            while (!owner.compare_exchange_weak(null_owner, hartid))
                null_owner = -1;

            recursive_count = 1;
            // _write(0, (char *)"LOCK\n", 5);
        }

        void unlock(bool recursive = false)
        {
            if (k_stage != K_MULTICORE)
                return;
            if (recursive)
            {
                if (owner == hartid)
                {
                    if (--recursive_count == 0)
                        owner = -1;
                }
                return;
            }

            if (owner == hartid)
                owner = -1;
        }
    };
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

由于声明中没有使用 __weak,所以我们要将整个编译单元的所有函数和变量全部覆盖,以通过编译,详情可以参见libc_hooks.cpp (opens new window)

# 后记

通过这样的操作,多核环境就搭建好了,输出也没有冲突。当然,博客所示的代码还是比较少,详细实现还请参看仓库代码 (opens new window)

这一波用了 3 天,属实给我调试到麻了,各种冲突、异常,整个都是比较抽象的。不过,结果是好的,下一步,就可以折腾 CPU 的中断了。亲爱的读者,我们下期再会!