几种唤醒方式的时延

几种唤醒方式的时延

1.背景

在多核世界里开发复杂系统,不可避免地要涉及多进程(线程)协作的场景。最常见的协作模式是生产者-消费者模式。生产者产生消息的时候,如何快速地唤醒消费者去处理,是一个基本问题。唤醒得越快,越有利于我们将系统内部时延降到最低。

Linux提供了众多的接口供我们实现这一功能,这里我们选取几个常用的进行分析和测量。

2.分析

a) pthread_cond_wait/signal

这个基本上是最常用的接口了,读者调用wait等待被唤醒,写者调用signal唤醒读者。

该接口要点如下:

  • 如果写者signal的时候,读者并没有在wait,那么这次signal就丢了
  • wait接口还有个timeout版本,可以指定超时时间,以避免长时间处于wait状态
  • signal有个broadcast版本,可以批量唤醒一批进程/线程
  • 这个接口要配合mutex使用,略显啰嗦

b) semaphore

信号量基本上是这里面历史最悠久的一个了,读者调用wait等待被唤醒,写者调用post唤醒读者。

该接口要点如下:

  • 该接口基本可以看成是传递信号的消息队列
  • 写者调用post时可以看做是往队列中放了一个信号
  • 读者调用wait的时候也可以看成是从队列中取出了一个信号
  • post和wait的操作是对等的,即有几个post就得有几个wait

c) eventfd

eventfd是近期版本的linux内核才加入的接口,要点如下:

  • 句柄被实现为文件描述符,可以使用类似read/write的接口进行读写,也可以被poll、epoll等多路复用机制管理
  • write的时候,可以写进去一个整数值,在read端可以自行合并,比如连续调用两次write,分别写入2、3,read的时候,读出来的是5
  • 只能在有亲缘关系的进程/线程之间通信
  • 不能实现类似broadcast这样的语义,一次只能唤醒一个读者线程

d) spin wait

抛弃操作系统提供的机制,读者自己去轮询查询是否有新数据。

3.测量

我们设计了一个测试程序,主要功能如下:

  • 支持测量上述四种唤醒方式的时延
  • 读者和写者需要绑定在同一处理器的不同核心上
  • 将读者或写者绑定在CPU-0上(0号CPU负责处理中断)
  • 使用rdtsc测量时延

*注意,上述规定十分重要,不然测试结果不可信。我之前进行过类似的测试,由于操作不当,结果完全是错的。

代码头文件如下:

#ifndef UTIL_H_
#define UTIL_H_

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <pthread.h>

//------------CPU绑定相关------------

void print_running_cpu()
{
    char qry_cmd[1024] = { 0 };
    sprintf(qry_cmd, "ps -o pid,spid,psr -T -p %d | grep %d | tail -n 1 | awk {'print $3'}", 
        getpid(), gettid());
    FILE *fp=popen(qry_cmd,"r");
    if(fp == NULL)
        return ;

    char cpu_id_str[200] = { 0 };
    fgets(cpu_id_str,80,fp);
    fclose(fp);

    printf("[%d] : current thread(%d@%d) is running on cpu(%d)\n", 
        gettid(), gettid(), getpid(), atoi(cpu_id_str));
}

void print_thread_affinity()
{
    cpu_set_t cpu_mask;

    CPU_ZERO(&cpu_mask);
    pthread_getaffinity_np(pthread_self(), sizeof(cpu_mask), &cpu_mask);

    printf("[%d] : current thread(%d@%d) can be running at cpu(",
        gettid(), gettid(), getpid());

    int cpu_num = sysconf(_SC_NPROCESSORS_CONF);
    for(int i = 0; i < cpu_num; ++i)
    {
        if (CPU_ISSET(i, &cpu_mask))//判断线程与哪个CPU有亲和力
        {
            printf("%d, ", i);
        }
    }
    printf(")\n");
}

void bind_thread_to_cpu(int cpu_id)
{
    cpu_set_t cpu_mask;
    CPU_ZERO(&cpu_mask);

    CPU_SET(cpu_id, &cpu_mask);
    //print_thread_affinity();
    printf("[%d] : binding current thread(%d@%d) to cpu(%d)\n",
        gettid(), gettid(), getpid(), cpu_id);
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_mask), &cpu_mask);
    //print_thread_affinity();
}

//------------计时相关------------

uint64_t get_cpu_freq()
{
    FILE *fp=popen("lscpu | grep CPU | grep MHz | awk  {'print $3'}","r");
    if(fp == NULL)
        return 0;

    char cpu_mhz_str[200] = { 0 };
    fgets(cpu_mhz_str,80,fp);
    fclose(fp);

    return atof(cpu_mhz_str) * 1000 * 1000;
}

uint64_t get_tsc()
{
    uint64_t a, d;
    __asm__ volatile("rdtsc" : "=a"(a), "=d"(d));
    return (d << 32) | a;
}

double get_time_diff_ns(uint64_t beg_tsc, uint64_t end_tsc)
{   
    return  (end_tsc - beg_tsc) * 1.0 * 1000 * 1000 * 1000 / get_cpu_freq();
}

#endif

代码主逻辑如下:

#include "util.h"

#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/eventfd.h>

// g++ -o wake_test wake_test.c -lpthread
//     -m 唤醒方法:
//            0-pthread_cond_wait/signal
//            1-sem
//            2-event_fd
//            3-忙等轮询
//     -l 测试轮次
//     -s 休息间隔(毫秒)

//测试次数
long g_test_loop_times;
//发送间隔
long g_sleep_time_ms;
//测试方式
int g_method_id;

//记录时间的数组
//写者
volatile int g_write_pos = 0;
uint64_t *g_write_time;
//读者
volatile int g_read_pos = 0;
uint64_t *g_read_time;


//各种不同方式使用的全局变量
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
sem_t g_sem_queue;
int g_ev_fd; // event_fd

//利用不同的唤醒方式实现的测试函数
void write_by_method(int method_id)
{
    //先写
    g_write_time[g_write_pos] = get_tsc();
    ++g_write_pos;

    //根据不同的方法唤醒读线程
    switch(method_id)
    {
        case 0: //pthread_cond_wait/signal
            pthread_mutex_lock(&g_mutex);
            pthread_cond_signal(&g_cond);
            pthread_mutex_unlock(&g_mutex);
            break;
        case 1: //sem
            sem_post(&g_sem_queue);           
            break;
        case 2: //eventfd
            eventfd_write(g_ev_fd, 1);
            break;
        case 3: //不做任何唤醒操作,读线程忙等轮询
            break;
        default:
            break;
    };
}

int read_by_method(int method_id)
{
    //根据不同的方法等待被唤醒
    switch(method_id)
    {
        case 0: //pthread_cond_wait/signal
            pthread_mutex_lock(&g_mutex);
            pthread_cond_wait(&g_cond, &g_mutex); 
            pthread_mutex_unlock(&g_mutex);
            break;
        case 1: //sem
            sem_wait(&g_sem_queue);           
            break;
        case 2: //eventfd
            eventfd_t value;
            eventfd_read(g_ev_fd, &value);
            break;
        case 3: //忙等轮询
            while(g_write_pos == g_read_pos)
                ;
            break;
        default:
            break;
    };

    //读出(考虑合并读出的情况)
    int read_cnt = 0;
    while(g_write_pos != g_read_pos)
    {
        g_read_time[g_read_pos] = get_tsc();
        ++g_read_pos;
        ++read_cnt;
    }

    return read_cnt;
}

void print_result() //微秒
{
    double sum_diff = 0.0;
    double max_diff = 0.0;
    double min_diff = 1.0*24*60*60*1000*1000;
    for(int i=0; i<g_test_loop_times; ++i)
    {
        double this_diff = get_time_diff_ns(g_write_time[i], g_read_time[i]) / 1000;
        //printf("%.02lf us\n", this_diff);
        sum_diff += this_diff;
        if(this_diff < min_diff)
            min_diff = this_diff;
        if(this_diff > max_diff)
            max_diff = this_diff;   
    }
    printf("MAX : %.02lf us\n", max_diff);
    printf("MIN : %.02lf us\n", min_diff);
    printf("AVG : %.02lf us\n", sum_diff / g_test_loop_times);
}

void* write_func(void* p_arg)
{
    int bind_cpu_id = *(int *)p_arg;
    bind_thread_to_cpu(bind_cpu_id);
    
    long loop = g_test_loop_times;
    while(loop--)
    {
        write_by_method(g_method_id);
        usleep(g_sleep_time_ms * 1000);
    }

    return NULL;
}

void* read_func(void* p_arg)
{
    int bind_cpu_id = *(int *)p_arg;
    bind_thread_to_cpu(bind_cpu_id);
    
    long loop = g_test_loop_times;
    long read_cnt = 0;
    while(1)
    {
        read_cnt += read_by_method(g_method_id);
        if(read_cnt >= g_test_loop_times)
            break;
    }

    print_result();
    return NULL;
}

int main(int argc, char **argv)
{
    g_test_loop_times = 100; //测试100次
    g_sleep_time_ms = 1000; //每次休息1秒
    g_method_id = 0; //使用忙等轮询作为唤醒方法

    //解析命令行参数
    int optval  = 0;
    while ((optval = getopt(argc, argv, "l:s:m:")) != EOF)
    {
		switch (optval) 
        {
			case 'l':
                g_test_loop_times = atoi(optarg);
                break;
			case 's':
                g_sleep_time_ms = atoi(optarg);
                break;
			case 'm':
                g_method_id = atoi(optarg);
                break;
            default:
                break;
        }
    }

    //初始化
    g_write_time = (uint64_t *)malloc(sizeof(uint64_t) * g_test_loop_times);
    g_read_time = (uint64_t *)malloc(sizeof(uint64_t) * g_test_loop_times);

    sem_init(&g_sem_queue, 0, g_test_loop_times);

    g_ev_fd = eventfd(0, 0);
    
    //定义四个CPU
    int cpu_id_0 = 0;
    int cpu_id_1 = 1;
    int cpu_id_2 = 2;
    int cpu_id_3 = 3;

    //启动读写线程
    pthread_t thr_id_1, thr_id_2;

    //读者2
    pthread_create(&thr_id_1, NULL, read_func, &cpu_id_2);

    sleep(1);

    //写者
    pthread_create(&thr_id_2, NULL, write_func, &cpu_id_0);
    
    //空转,等待测试完成
    while(1)
        sleep(1);

    return 0;
}

程序支持四个参数:

  • -m用于指定唤醒机制:0是pthread接口;1是semaphore接口;2是eventfd;3是忙等轮询
  • -l用于指定测试轮次,即测试几次求平均
  • 0s用于指定每次测试之间的休息间隔,单位是毫秒,建议尽量设大一点,比如每次间隔休息1秒,这样可以避免写信号被合并读出,使测试数据偏小

我测试的机器配置如下:

  • Intel Xeon E5-2637 v4 @3.5GHz
  • 两个NUMA节点,共16核

测试循环100个轮次,每个轮次之间休息1秒,测试结果如下:

唤醒方法最大(us)最小(us)平均(us)
pthread5.832.483.06
semaphore4.562.252.66
eventfd5.372.302.71
spin wait0.310.10.14

4.结论

根据上面的测试结果,除了spin wait效果优秀外,其他三种唤醒方式都没有本质区别。因为前面三种都是基于操作系统中断的,注定快不到哪去,而spin wait只有CPU缓存同步的开销,因此极快。

但是spin wait会带来另外的开销:CPU需要一直满负荷跑,对机器寿命、散热、电源、机房安全等都是巨大的挑战。对于机器较多的机房来讲,电费也会是一笔巨大的开支,为了这几微秒的时延,付出这么大的成本,是否值得呢?

其实还有另外一种思路,现在的CPU动辄十几或几十个核,我们可以把尽量多的应用集中到一台机器上,把这台机器的性能压榨到极致,进而减少机器数量,这样单台机器的运行成本虽然很高,但是总体成本也能降下来。泛欧交易所采用的就是这种思路。

建议:

  • 对时延极度敏感,集群规模小,用spin wait
  • 其他场景,根据喜好选择其他三种任选之一即可,性能不差,还省电

发表回复