Nginx源码学习:惊群的处理

Nginx 是一个多进程的模型, 后台进程包括一个 master 进程和多个 worker 进程.
master 进程负责和用户进行交互以及对 worker 进程的管理;
worker 进程进行各类事件的处理

Nginx 处理惊群

上面介绍过, nginx 中所有的 worker 进程都继承了 master 进程的监听 socketfd, 调用 epoll_wait 进程等待, 这样的话如果没有任何处理, 那么一个新连接到来时, 会有多个 worker 进程会唤醒, 但是只有一个 worker 进程会 accept 成功, 其余皆失败, 那么这样就导致了性能的浪费

accept已经不会再导致惊群的出现
linux 2.6 版本之前, 监听同一个 socketfd 的进程会挂在同一个等待队列上, 当请求到来时, 唤醒所有等待的进程,
2.6 版本之后, 引入了一个标记位WQ_FLAG_EXCLUSIVE, 设置等待队列的flagEXCLUSIVE, 表示一次只有一个进程会被唤醒, 当有新连接进来时, 在唤醒等待队列的函数里(__wake_up_common()), 会将传进来的参数(nr_exclusive, 值1)和WQ_FLAG_EXCLUSIVE&运算, 结果为真, 跳出循环

nginx的处理方式

  1. 使用锁
  2. 使用 epoll 的EPOLLEXCLUSIVE选项
  3. 使用SO_REUSEPORT选项

使用锁的方式

nginx 中自己实现了锁机制,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

typedef struct {
#if (NGX_HAVE_ATOMIC_OPS) //这个宏判断是否支持原子操作
ngx_atomic_t *lock;
#if (NGX_HAVE_POSIX_SEM) //是否支持信号量
ngx_atomic_t *wait;
ngx_uint_t semaphore;
sem_t sem;
#endif
#else //不支持原子变量, 使用文件锁, 效率较低
ngx_fd_t fd;
u_char *name;
#endif
ngx_uint_t spin; //获取锁时尝试的自旋次数,使用原子操作实现时才有意义
} ngx_shmtx_t;

不支持原子变量时, 会使用文件锁来实现互斥锁
支持原子变量时会使用变量会使用原子变量操作来实现一个自旋锁, 其中spin表示自旋次数,
而最新的 nginx 代码增加了这个支持信号量的判断是在支持信号量的情况下, 如果自旋次数达到了上限而进程还未获取到锁, 则进程会在信号量上阻塞, 进入睡眠状态; 不支持信号量情况下, 直接让出 CPU

  • ngx_event.c/ngx_event_process_init()中, 初始化 ngx_use_accept_mutex

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //如果使用了master worker的多进程模式.且worker进程数量>1,且配置文件里设置了使用accept_mutex
    if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex)
    {
    ngx_use_accept_mutex = 1; //初始化该变量表示nginx使用accept互斥
    ngx_accept_mutex_held = 0; //表示当前是否已经持有锁
    ngx_accept_mutex_delay = ecf->accept_mutex_delay; //表示当获得锁失败后再次请求锁的价格时间

    }
    else
    {
    ngx_use_accept_mutex = 0;
    }
  • 配置文件中的 events区域下可以设置accept_mutex选项, 不设置默认为关闭

    1
    2
    3
    events {
    accept_mutex on;
    }
  • ngx_event_accept.c会初始化在ngx_event.c声明的变量ngx_accept_disabled, 这个变量是一个阈值

如果大于 0, 表示当前进程处于高负载,放弃接收新连接, 继续处理本进程事件, 并且ngx_accept_disabled减一, 表示让出了一次 accept 的机会, 负载减轻, 直到该值小于0, 重新进入低负载状态
小于 0, 表示处于低负载, 可以继续申请 accept 锁, 接收新连接

相当于实现了一个负载均衡

1
ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;	//当前连接数
  • ngx_event.c/ngx_process_events_and_timers()中判断是否使用锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    //每个worker进程在该函数中处理事件
    void
    ngx_process_events_and_timers(ngx_cycle_t *cycle)
    {
    ngx_uint_t flags;
    ngx_msec_t timer, delta;

    if (ngx_timer_resolution) {
    timer = NGX_TIMER_INFINITE;
    flags = 0;

    } else {
    timer = ngx_event_find_timer();
    flags = NGX_UPDATE_TIME;

    #if (NGX_WIN32)

    /* handle signals from master in case of network inactivity */

    if (timer == NGX_TIMER_INFINITE || timer > 500) {
    timer = 500;
    }

    #endif
    }

    //ngx_use_accept_mutex标志位表示是否需要对accept枷锁来解决惊群问题
    //当worker进程数>1且配置文件中
    if (ngx_use_accept_mutex)
    {
    if (ngx_accept_disabled > 0) //当前处于高负载,放弃竞争accept锁,
    {
    ngx_accept_disabled--; //负载减轻

    }
    else
    {
    //尝试获取锁,出错返回,成功会将监听套接字加入到当前的epoll事件驱动模块中
    if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR)
    {
    return;
    }
    //为1表示已经获得锁
    if (ngx_accept_mutex_held)
    {
    //该标记表示如果有新连接或者数据到来时,先保存该事件,等释放锁之后,在进行accept或者读取,
    //因为我们应该保证持有锁的时间尽可能短
    flags |= NGX_POST_EVENTS;
    }
    else
    {
    //没有获得锁就设置timer定时器,在epoll中休眠(中途可以被唤醒,比如有新连接到达),然后继续tryLock
    if (timer == NGX_TIMER_INFINITE
    || timer > ngx_accept_mutex_delay)
    {
    timer = ngx_accept_mutex_delay;
    }
    }
    }
    }

    //计算ngx_process_events函数执行花费的时间
    delta = ngx_current_msec;

    //在linux下使用epoll时,该函数对应的是ngx_epoll_process_events函数
    (void) ngx_process_events(cycle, timer, flags);

    //计算用时
    delta = ngx_current_msec - delta;

    //记录log
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
    "timer delta: %M", delta);
    //执行对应的新建立的连接的延迟队列中的事件
    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    //如果持有锁,释放
    if (ngx_accept_mutex_held) {
    ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    //ngx_process_enevts耗时>0时,执行所有的超时事件
    if (delta) {
    ngx_event_expire_timers();
    }

    //执行普通事件延迟队列中的事件
    ngx_event_process_posted(cycle, &ngx_posted_events);
    }
  • 然后就是ngx_event_accept.c/ngx_trylock_accept_mutex()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45


    //尝试获取锁
    ngx_int_t
    ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
    {
    //尝试获取ngx_accept_mutex,非阻塞的
    if (ngx_shmtx_trylock(&ngx_accept_mute
    {
    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
    "accept mutex locked");
    //如果本来就拥有锁直接返回OK
    //ngx_accept_events宏是用于eventport的,eventport也是一个事件驱动库,他会每次调用完事件后自动清除,
    //会将ngx_accept_events置1需要重新添加
    //应该是不用管的
    if (ngx_accept_mutex_held && ngx_accept_events == 0) {
    return NGX_OK;
    }

    //将监听连接上的所有读事件添加到当前epoll事件合集
    if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
    ngx_shmtx_unlock(&ngx_accept_mutex);
    return NGX_ERROR;
    }

    ngx_accept_events = 0;
    ngx_accept_mutex_held = 1;

    return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
    "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    //如果前面获取失败了,这还是1就要处理为0
    if (ngx_accept_mutex_held) {
    if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
    return NGX_ERROR;
    }

    ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
    }

nginx 采用锁的方式避免惊群的逻辑很简单, 就是只能有一个进程可以获得接收新连接的机会

采用EPOLLEXCLUSIVE

这个选项作为epoll_ctl的第二个参数, 是专门用来解决多个 epoll 句柄监听了同一个套接字导致的惊群问题

This new flag allows for exclusive wakeups when there are multiple epfds attached to a shared fd event source.
The implementation walks the list of exclusive waiters, and queues an event to each epfd, until it finds the first waiter that has threads blocked on it via epoll_wait(). The idea is to search for threads which are idle and ready to process the wakeup events. Thus, we queue an event to at least 1 epfd, but may still potentially queue an event to all epfds that are attached to the shared fd source.

当多个 epfd 添加了共享的fd事件源时,这个新标志允许独占唤醒。
该实现遍历独占等待列表,并将事件排队到每个epfd,直到找到第一个通过epoll_wait()在其上阻塞线程的等待者。其思想是搜索空闲的线程,并准备处理唤醒事件。因此,我们至少将一个事件排队到一个epfd,但是仍然可能将一个事件排队到所有附加到共享fd源的epfd。

该选项只支持和EPOLL_CTL_ADD使用

使用SO_REUSEPORT

这个就是交给内核去做负载均衡了


Nginx 惊群效率的考虑

Nginx 的进程数时固定的, 所以他发生惊群时, 导致的后果并没有那么严重, 反而当连接数非常大时, 关闭 accept_mutex, 效率还会提升
因为有巨大的连接量时, 如果还是每个进程互斥的去接收新连接, 效率可想而知, 但是所有的进程全部激活各自去抢, 反而效率会高一些

0%