几种唤醒方式的时延
1.背景
在多核世界里开发复杂系统,不可避免地要涉及多进程(线程)协作的场景。最常见的协作模式是生产者-消费者模式。生产者产生消息的时候,如何快速地唤醒消费者去处理,是一个基本问题。唤醒得越快,越有利于我们将系统内部时延降到最低。
Linux提供了众多的接口供我们实现这一功能,这里我们选取几个常用的进行分析和测量。
2.分析
a) pthread_cond_wait/signal
这个基本上是最常用的接口了,读者调用wait等待被唤醒,写者调用signal唤醒读者。
该接口要点如下:
- 如果写者signal的时候,读者并没有在wait,那么这次signal就丢了
- wait接口还有个timeout版本,可以指定超时时间,以避免长时间处于wait状态
- signal有个broadcast版本,可以批量唤醒一批进程/线程
- 这个接口要配合mutex使用,略显啰嗦
b) semaphore
信号量基本上是这里面历史最悠久的一个了,读者调用wait等待被唤醒,写者调用post唤醒读者。
该接口要点如下:
- 该接口基本可以看成是传递信号的消息队列
- 写者调用post时可以看做是往队列中放了一个信号
- 读者调用wait的时候也可以看成是从队列中取出了一个信号
- post和wait的操作是对等的,即有几个post就得有几个wait
c) eventfd
eventfd是近期版本的linux内核才加入的接口,要点如下:
- 句柄被实现为文件描述符,可以使用类似read/write的接口进行读写,也可以被poll、epoll等多路复用机制管理
- write的时候,可以写进去一个整数值,在read端可以自行合并,比如连续调用两次write,分别写入2、3,read的时候,读出来的是5
- 只能在有亲缘关系的进程/线程之间通信
- 不能实现类似broadcast这样的语义,一次只能唤醒一个读者线程
d) spin wait
抛弃操作系统提供的机制,读者自己去轮询查询是否有新数据。
3.测量
我们设计了一个测试程序,主要功能如下:
- 支持测量上述四种唤醒方式的时延
- 读者和写者需要绑定在同一处理器的不同核心上
- 将读者或写者绑定在CPU-0上(0号CPU负责处理中断)
- 使用rdtsc测量时延
*注意,上述规定十分重要,不然测试结果不可信。我之前进行过类似的测试,由于操作不当,结果完全是错的。
代码头文件如下:
#ifndef UTIL_H_
#define UTIL_H_
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <pthread.h>
//------------CPU绑定相关------------
void print_running_cpu()
{
char qry_cmd[1024] = { 0 };
sprintf(qry_cmd, "ps -o pid,spid,psr -T -p %d | grep %d | tail -n 1 | awk {'print $3'}",
getpid(), gettid());
FILE *fp=popen(qry_cmd,"r");
if(fp == NULL)
return ;
char cpu_id_str[200] = { 0 };
fgets(cpu_id_str,80,fp);
fclose(fp);
printf("[%d] : current thread(%d@%d) is running on cpu(%d)\n",
gettid(), gettid(), getpid(), atoi(cpu_id_str));
}
void print_thread_affinity()
{
cpu_set_t cpu_mask;
CPU_ZERO(&cpu_mask);
pthread_getaffinity_np(pthread_self(), sizeof(cpu_mask), &cpu_mask);
printf("[%d] : current thread(%d@%d) can be running at cpu(",
gettid(), gettid(), getpid());
int cpu_num = sysconf(_SC_NPROCESSORS_CONF);
for(int i = 0; i < cpu_num; ++i)
{
if (CPU_ISSET(i, &cpu_mask))//判断线程与哪个CPU有亲和力
{
printf("%d, ", i);
}
}
printf(")\n");
}
void bind_thread_to_cpu(int cpu_id)
{
cpu_set_t cpu_mask;
CPU_ZERO(&cpu_mask);
CPU_SET(cpu_id, &cpu_mask);
//print_thread_affinity();
printf("[%d] : binding current thread(%d@%d) to cpu(%d)\n",
gettid(), gettid(), getpid(), cpu_id);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_mask), &cpu_mask);
//print_thread_affinity();
}
//------------计时相关------------
uint64_t get_cpu_freq()
{
FILE *fp=popen("lscpu | grep CPU | grep MHz | awk {'print $3'}","r");
if(fp == NULL)
return 0;
char cpu_mhz_str[200] = { 0 };
fgets(cpu_mhz_str,80,fp);
fclose(fp);
return atof(cpu_mhz_str) * 1000 * 1000;
}
uint64_t get_tsc()
{
uint64_t a, d;
__asm__ volatile("rdtsc" : "=a"(a), "=d"(d));
return (d << 32) | a;
}
double get_time_diff_ns(uint64_t beg_tsc, uint64_t end_tsc)
{
return (end_tsc - beg_tsc) * 1.0 * 1000 * 1000 * 1000 / get_cpu_freq();
}
#endif
代码主逻辑如下:
#include "util.h"
#include <sys/time.h>
#include <time.h>
#include <semaphore.h>
#include <sys/eventfd.h>
// g++ -o wake_test wake_test.c -lpthread
// -m 唤醒方法:
// 0-pthread_cond_wait/signal
// 1-sem
// 2-event_fd
// 3-忙等轮询
// -l 测试轮次
// -s 休息间隔(毫秒)
//测试次数
long g_test_loop_times;
//发送间隔
long g_sleep_time_ms;
//测试方式
int g_method_id;
//记录时间的数组
//写者
volatile int g_write_pos = 0;
uint64_t *g_write_time;
//读者
volatile int g_read_pos = 0;
uint64_t *g_read_time;
//各种不同方式使用的全局变量
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
sem_t g_sem_queue;
int g_ev_fd; // event_fd
//利用不同的唤醒方式实现的测试函数
void write_by_method(int method_id)
{
//先写
g_write_time[g_write_pos] = get_tsc();
++g_write_pos;
//根据不同的方法唤醒读线程
switch(method_id)
{
case 0: //pthread_cond_wait/signal
pthread_mutex_lock(&g_mutex);
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_mutex);
break;
case 1: //sem
sem_post(&g_sem_queue);
break;
case 2: //eventfd
eventfd_write(g_ev_fd, 1);
break;
case 3: //不做任何唤醒操作,读线程忙等轮询
break;
default:
break;
};
}
int read_by_method(int method_id)
{
//根据不同的方法等待被唤醒
switch(method_id)
{
case 0: //pthread_cond_wait/signal
pthread_mutex_lock(&g_mutex);
pthread_cond_wait(&g_cond, &g_mutex);
pthread_mutex_unlock(&g_mutex);
break;
case 1: //sem
sem_wait(&g_sem_queue);
break;
case 2: //eventfd
eventfd_t value;
eventfd_read(g_ev_fd, &value);
break;
case 3: //忙等轮询
while(g_write_pos == g_read_pos)
;
break;
default:
break;
};
//读出(考虑合并读出的情况)
int read_cnt = 0;
while(g_write_pos != g_read_pos)
{
g_read_time[g_read_pos] = get_tsc();
++g_read_pos;
++read_cnt;
}
return read_cnt;
}
void print_result() //微秒
{
double sum_diff = 0.0;
double max_diff = 0.0;
double min_diff = 1.0*24*60*60*1000*1000;
for(int i=0; i<g_test_loop_times; ++i)
{
double this_diff = get_time_diff_ns(g_write_time[i], g_read_time[i]) / 1000;
//printf("%.02lf us\n", this_diff);
sum_diff += this_diff;
if(this_diff < min_diff)
min_diff = this_diff;
if(this_diff > max_diff)
max_diff = this_diff;
}
printf("MAX : %.02lf us\n", max_diff);
printf("MIN : %.02lf us\n", min_diff);
printf("AVG : %.02lf us\n", sum_diff / g_test_loop_times);
}
void* write_func(void* p_arg)
{
int bind_cpu_id = *(int *)p_arg;
bind_thread_to_cpu(bind_cpu_id);
long loop = g_test_loop_times;
while(loop--)
{
write_by_method(g_method_id);
usleep(g_sleep_time_ms * 1000);
}
return NULL;
}
void* read_func(void* p_arg)
{
int bind_cpu_id = *(int *)p_arg;
bind_thread_to_cpu(bind_cpu_id);
long loop = g_test_loop_times;
long read_cnt = 0;
while(1)
{
read_cnt += read_by_method(g_method_id);
if(read_cnt >= g_test_loop_times)
break;
}
print_result();
return NULL;
}
int main(int argc, char **argv)
{
g_test_loop_times = 100; //测试100次
g_sleep_time_ms = 1000; //每次休息1秒
g_method_id = 0; //使用忙等轮询作为唤醒方法
//解析命令行参数
int optval = 0;
while ((optval = getopt(argc, argv, "l:s:m:")) != EOF)
{
switch (optval)
{
case 'l':
g_test_loop_times = atoi(optarg);
break;
case 's':
g_sleep_time_ms = atoi(optarg);
break;
case 'm':
g_method_id = atoi(optarg);
break;
default:
break;
}
}
//初始化
g_write_time = (uint64_t *)malloc(sizeof(uint64_t) * g_test_loop_times);
g_read_time = (uint64_t *)malloc(sizeof(uint64_t) * g_test_loop_times);
sem_init(&g_sem_queue, 0, g_test_loop_times);
g_ev_fd = eventfd(0, 0);
//定义四个CPU
int cpu_id_0 = 0;
int cpu_id_1 = 1;
int cpu_id_2 = 2;
int cpu_id_3 = 3;
//启动读写线程
pthread_t thr_id_1, thr_id_2;
//读者2
pthread_create(&thr_id_1, NULL, read_func, &cpu_id_2);
sleep(1);
//写者
pthread_create(&thr_id_2, NULL, write_func, &cpu_id_0);
//空转,等待测试完成
while(1)
sleep(1);
return 0;
}
程序支持四个参数:
- -m用于指定唤醒机制:0是pthread接口;1是semaphore接口;2是eventfd;3是忙等轮询
- -l用于指定测试轮次,即测试几次求平均
- 0s用于指定每次测试之间的休息间隔,单位是毫秒,建议尽量设大一点,比如每次间隔休息1秒,这样可以避免写信号被合并读出,使测试数据偏小
我测试的机器配置如下:
- Intel Xeon E5-2637 v4 @3.5GHz
- 两个NUMA节点,共16核
测试循环100个轮次,每个轮次之间休息1秒,测试结果如下:
| 唤醒方法 | 最大(us) | 最小(us) | 平均(us) |
| pthread | 5.83 | 2.48 | 3.06 |
| semaphore | 4.56 | 2.25 | 2.66 |
| eventfd | 5.37 | 2.30 | 2.71 |
| spin wait | 0.31 | 0.1 | 0.14 |
4.结论
根据上面的测试结果,除了spin wait效果优秀外,其他三种唤醒方式都没有本质区别。因为前面三种都是基于操作系统中断的,注定快不到哪去,而spin wait只有CPU缓存同步的开销,因此极快。
但是spin wait会带来另外的开销:CPU需要一直满负荷跑,对机器寿命、散热、电源、机房安全等都是巨大的挑战。对于机器较多的机房来讲,电费也会是一笔巨大的开支,为了这几微秒的时延,付出这么大的成本,是否值得呢?
其实还有另外一种思路,现在的CPU动辄十几或几十个核,我们可以把尽量多的应用集中到一台机器上,把这台机器的性能压榨到极致,进而减少机器数量,这样单台机器的运行成本虽然很高,但是总体成本也能降下来。泛欧交易所采用的就是这种思路。
建议:
- 对时延极度敏感,集群规模小,用spin wait
- 其他场景,根据喜好选择其他三种任选之一即可,性能不差,还省电