Openresty Lua協程調度機制

猫猫哥 2021-08-15 23:59:57 阅读数:37

本文一共[544]字,预计阅读时长:1分钟~
openresty lua

寫在前面

OpenResty(後面簡稱:OR)是一個基於Nginx和Lua的高性能Web平臺,它內部集成大量的Lua API以及第三方模塊,可以利用它快速搭建支持高並發、極具動態性和擴展性的Web應用、Web服務或動態網關。

OR最大的特點就是,將Lua協程與Nginx事件驅動模型及非阻塞I/O結合起來。使用戶可以在handler中使用 同步但是依然是非阻塞 的方式編寫其應用代碼,而無需關心底層的協程調度以及與Nginx事件驅動模型的交互。

本文將先從總體上介紹OR的協程調度機制,然後結合源碼以及Lua棧的情况來詳細了解各個部分是如何實現的,包括其异常保護、協程初始化、協程的恢複和執行、協程的掛起、協程的執行結束、協程出錯的情况。

本文主要關注調度函數內部的邏輯,如果想了解外部的調用流程。可以參看Openresty Lua鉤子調用完整流程

注:lua-nginx模塊與stream-lua-nginx模塊的主體部分類似,後者實現相對簡單一點。下面的討論將基於stream-lua模塊。

為了防止歧義,文中用到的一些術語明確一下:

  • 主線程:錶示外層調用run_thread()的OS線程
  • 入口線程:每個handler被調用時會創建一個入口線程,用於執行lua代碼
  • 用戶線程:用戶在Lua代碼中通過ngx.thread.spawn()創建的線程
  • 用戶協程:用戶在Lua代碼中通過coroutine.create()創建的協程
  • 協程:泛指所有協程,包括入口線程、用戶線程和用戶協程
  • vm:錶示Lua虛擬機
  • L:視出現的上下文,一般錶示父協程,在創建入口線程的時候錶示Lua VM
  • co:一般錶示新創建的協程
  • L棧: |協程錶|新協程|頂|:錶示Lua棧結構,最右邊是棧頂

關鍵數據結構

在深入了解協程調度機制之前,我們先來認識一下主要的數據結構:

  • 協程上下文:ngx_stream_lua_co_ctx_t
    • 協程內部棧(coctx->co
    • 協程狀態(coctx->co_status
    • 維護協程之間關系的數據(父協程coctx->parent_co_ctx、僵屍子線程coctx->zombie_child_threads
    • 用戶相關數據(coctx->data
    • 在Lua的registry錶中對應該線程指針的引用值(co_ref
    • 一些狀態標記(是否是用戶線程is_uthread、是否因創建新線程thread_spawn_yielded被yield)
  • 模塊上下文:ngx_stream_lua_ctx_t
    • ctx->cur_co_ctx(當前調度協程上下文)
    • ctx->co_op(協程是以何種方式YIELD)
  • 核心調度函數:ngx_stream_lua_run_thread()

協程調度

首先你可能很好奇OR為什麼要在C引擎層面自己實現協程的調度?或者說這麼做的好處是什麼?我覺得最主要的原因還是减輕開發者的負擔。

原生Lua coroutine接口

我們知道Lua是個非常輕巧的語言,它不像Go有自己的調度器。Lua原生的對協程的操作無非就是coroutine.resume()coroutine.yield()。這兩者是成對出現的,協程coroutine.yield()之後肯定回到父協程coroutine.resume()的地方,恢複子協程需要顯式再次coroutine.resume()。如果要在Lua代碼層面實現非阻塞I/O,那麼父協程必須處理子協程I/O等待的情况,並在事件發生時恢複子協程的執行。如果需要同時進行多個任務,那麼父協程就需要負責多個協程間的調度。因為協程的拓撲可能是一個複雜的樹狀結構,所以協程的調度管理將變得异常複雜。

OpenResty實現

OR在C引擎層幫我們把這些事情都做了,你無須再關心所有這些,只需專心寫你的業務邏輯。為了支持同步非阻塞的方式編寫應用代碼,OR重寫了coroutine的接口函數,從而接管了協程的調度,並在coroutine基礎上封裝抽象出了thread的概念。無論是coroutine還是thread,I/O等待對於用戶都是透明的,用戶無需關心。兩者的主要區別是,coroutine父子之間的協作度更高,coroutine.yield()coroutine.resume()成對出現。在子協程執行完成(出錯)或者顯式coroutine.yield()之前,父協程一直處於等待狀態。而thread則由調度器進行調度,子thread一旦開始執行就不再受父協程控制了,在需要並發請求時很有用。thread提供了spawn()wait()等接口,spawn()執行參數中指定的函數,直到執行完畢、出錯或者I/O等待時返回。wait()則使父協程可以同步等待子線程執行完畢、獲取結果。

OR在對協程調度上,最核心的改動是其創建新協程時的行為(coroutine.resume(), ngx.thread.spawn())。它不會直接調用lua_resume(),而是先lua_yield()回到主線程,然後由主線程再根據情况lua_resume()下一個協程。Lua代碼域內從來不會直接調用lua_resume(),理解了這一點你就理解了OpenResty協程調度的精髓。

所以OR中協程拓撲是一個單層的結構,它只有一個入口點。這樣使得協程調度更加靈活,I/O事件的觸發時回調函數也更容易實現。

OR調度器根據lua_resume()的返回值,確定協程是掛起了、結束了還是出錯了。因為OR改動了創建新協程時行為,同時又抽象了thread概念,所以如果是協程掛起的情况,還需要知道是什麼原因掛起,以便做相應的不同處理。是繼續調度?還是返回上層?我們前面提到的ctx->co_op便是做這個用途。

協程的調度在核心調度函數ngx_stream_lua_run_thread()中進行,它是創建或恢複協程的唯一入口點。最初是由配置的Lua鉤子調用(圖中ssl_cert_handler()),如果碰到了I/O等待的情况,後續則由對應的事件handler(圖中的sleep_handler()read_handler())再次拉起。run_thread()裏面實現了一個調度循環,循環裏面先從ctx->cur_co_ctx獲取下一個待resume的協程上下文,然後lua_resume()執行或恢複該協程,其返回值LUA_YIELD錶示協程掛起,0錶示協程執行結束,其餘的錶示協程出錯了。其中協程掛起又分為四種不同的情况:即等待I/O、新建thread、coroutine.resume()coroutine.yield()。根據不同的情况,决定是跳到循環前面繼續恢複下一個協程,還是返回上層函數。

下圖是協程調度主要邏輯的示意圖,可以看到在Lua代碼域中無論是新建、掛起或恢複協程,都是先調用lua_yield()回到主線程。I/O操作例如ngx.tcp.receive()如果碰到了I/O等待,會在內部注册epoll事件(對於sleep的情况是定時器),然後自動lua_yield(),當事件觸發時繼續未完成的I/O操作,完成之後再調用run_thread()恢複之前被掛起的協程。

openresty-lua-coroutine-schedule

异常保護

作為一個調度器,OpenResty扮演者類似操作系統內核的角色,不過它的調度對象是Lua協程。作為一個“內核”,無論其調度對象出了什麼問題,都不應該使這個系統崩潰,而是應該將錯誤信息打印出來。

Openresty內部就做了一個這樣的异常保護,其原理就是用setjmplongjmp包住了run_thread()裏面的整個協程調度邏輯。

/* 首先注册虛擬機的panic回調 */
lua_atpanic(L, ngx_stream_lua_atpanic);
/* setjmp保存環境 */
NGX_LUA_EXCEPTION_TRY {
/* 執行調度邏輯 */
} NGX_LUA_EXCEPTION_CATCH {
/* 出現异常時走到這裏 */
dd("nginx execution restored");
}

ngx_stream_lua_atpanic()的實現也非常簡單,只是簡單地打印崩潰日志,然後調用NGX_LUA_EXCEPTION_THROW(1);恢複nginx的執行。

int
ngx_stream_lua_atpanic(lua_State *L)
{
#ifdef NGX_LUA_ABORT_AT_PANIC
abort();
#else
u_char *s = NULL;
size_t len = 0;
if (lua_type(L, -1) == LUA_TSTRING) {
s = (u_char *) lua_tolstring(L, -1, &len);
}
if (s == NULL) {
s = (u_char *) "unknown reason";
len = sizeof("unknown reason") - 1;
}
ngx_log_stderr(0, "lua atpanic: Lua VM crashed, reason: %*s", len, s);
ngx_quit = 1;
/* restore nginx execution */
NGX_LUA_EXCEPTION_THROW(1);
/* impossible to reach here */
#endif
}

這幾個宏定義分別如下:

#define NGX_LUA_EXCEPTION_TRY \
if (setjmp(ngx_stream_lua_exception) == 0)
#define NGX_LUA_EXCEPTION_CATCH \
else
#define NGX_LUA_EXCEPTION_THROW(x) \
longjmp(ngx_stream_lua_exception, (x))

協程初始化

鉤子的入口線程

ngx_stream_lua_new_thread()用於創建入口線程

OR中需要在Registry錶中存儲每個創建出來的Lua線程的reference,這個存儲協程的錶在Registry錶中對應的key是全局變量ngx_stream_lua_coroutines_key的地址,因此下面這段代碼就是從Registry錶中查詢這個儲存協程的錶,返回到棧頂:

/* 返回棧頂元素的索引,等於棧中元素的個數 */
base = lua_gettop(L);
/* 將存儲協程的錶對應的key壓棧 */
lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
coroutines_key));
/* 將key出棧,獲取Registry錶中key對應的元素,然後將結果入棧 */
lua_rawget(L, LUA_REGISTRYINDEX);

接下來創建一個新的協程,同時初始化其全局錶:

/* 創建Lua協程,返回的新lua_State跟原有的lua_State共享所有的全局對象(如錶),
但是有一個獨立的執行棧。 協程依賴垃圾回收銷毀 */
/* L棧: |協程錶|新協程|頂| */
co = lua_newthread(L);
/* 創建該協程的全局錶,設置_G field為全局錶自己 */
/* L棧: |協程錶|新協程|協程新的全局錶|頂| */
ngx_stream_lua_create_new_globals_table(co, 0, 0);
/* 再創建一個新錶 */
/* L棧: |協程錶|新協程|協程新的全局錶|新錶|頂| */
lua_createtable(co, 0, 1);
/* 拿到全局錶 */
/* L棧: |協程錶|新協程|協程新的全局錶|新錶|舊全局錶|頂| */
ngx_stream_lua_get_globals_table(co);
/* 新錶的__index的值為棧頂的值,也即就全局錶 */
/* L棧: |協程錶|新協程|協程新的全局錶|新錶|頂| */
lua_setfield(co, -2, "__index");
/* 新錶出棧,將其設為索引-2處即協程新的全局錶的元錶 */
/* L棧: |協程錶|新協程|協程新的全局錶|頂| */
lua_setmetatable(co, -2);
/* 設置協程新的全局錶到對應索引,其_G field是自己,
其元錶是新錶,新錶的__index是父協程的全局錶 */
/* L棧: |協程錶|新協程|頂| */
ngx_stream_lua_set_globals_table(co);

這一塊的邏輯有點繞,我們來稍微理一下,其實就是用新建的全局錶替換了舊的全局錶,其中新的全局錶的_G字段是它自己,新全局錶的元錶中__index元方法是舊的全局錶。

此時的Lua虛擬機棧頂情况如下圖所示:

L->top | 棧頂 |
L->top - 1 |Lua_State*| 新創建的協程
L->top -2 | Lua Table| 存儲協程引用的錶

下面一步就是在Lua虛擬機中為這個新協程創建一個reference:

/* 為棧頂對象(即新協程),創建並返回一個協程錶中的引用 */
/* 當前棧: |協程錶|頂| */
*ref = luaL_ref(L, -2);
if (*ref == LUA_NOREF) {
lua_settop(L, base); /* restore main thread stack */
return NULL;
}

最後恢複堆棧

/* 設置棧頂索引 */
/* 當前棧: |頂| */
lua_settop(L, base);
return co;

以上步驟還只是創建了一個什麼都不能做的Lua協程,回到_by_chunk()函數之後還需要把入口函數放入協程中。

/* 將lua虛擬機VM棧上的入口函數閉包移到新創建的協程棧上,
這樣新協程就有了虛擬機已經解析完畢的代碼了。*/
lua_xmove(L, co, 1);
/* 拿到co全局錶,放到棧頂 */
/* 當前棧: |入口closure|全局錶|頂| */
ngx_stream_lua_get_globals_table(co);
/* 將全局錶設為入口closure的環境錶 */
/* 當前棧: |入口closure|頂|*/
lua_setfenv(co, -2);

至此,協程入口函數以及環境錶已經設置好。接下來就是讓它能够運行起來,讓調度器能够調度它運行:

/* 將nginx請求保存到協程全局錶 */
ngx_stream_lua_set_req(co, r);
ctx->cur_co_ctx = &ctx->entry_co_ctx;
ctx->cur_co_ctx->co = co;
ctx->cur_co_ctx->co_ref = co_ref;

接下來就是注册cleanup鉤子,然後ngx_stream_lua_run_thread()

用戶創建的uthread

用戶線程由ngx.thread.spawn()創建,對應的C實現是ngx_stream_lua_uthread_spawn()。首先它會調ngx_stream_lua_coroutine_create_helper()創建一個新的協程。

創建協程

注意協程都是在worker的虛擬機上創建的(不考慮cache off的情况的話)。但是用戶協程會繼承父協程的全局錶,其父子關系由OR進行維護。

/* 獲取虛擬機 */
vm = ngx_stream_lua_get_lua_vm(r, ctx);
/* 創建新協程 */
co = lua_newthread(vm);
/* 然後創建coctx,設置其co、co_status值 */
coctx = ngx_stream_lua_create_co_ctx;
coctx->co = co;
coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;

此時父協程的棧如下:

/* 當前棧: |entry_func|args|頂| */

接下來將父協程的全局錶給新創建的協程:

/* make new coroutine share globals of the parent coroutine.
* NOTE: globals don't have to be separated! */
/* 拷貝父協程的全局錶到棧上 */
/* L棧: |entry_func|args|全局錶|頂| */
ngx_stream_lua_get_globals_table(L);
/* 將全局錶移動到新創建的協程co的棧上 */
/* L棧: |entry_func|args|頂| */
lua_xmove(L, co, 1);
/* 從新協程棧上寫入其的全局錶 */
ngx_stream_lua_set_globals_table(co);
/* 將新協程從進程虛擬機,移動到父協程中 */
/* L棧: |entry_func|args|新協程|頂| */
lua_xmove(vm, L, 1);
/* 入口函數拷貝到L棧頂 */
/* L棧: |entry_func|args|新協程|entry_func|頂|*/
lua_pushvalue(L, 1);
/* 將入口函數從L移到co棧中 */
/* L棧: |entry_func|args|新協程|頂| */
/* co棧: |entry_func|頂|*/
lua_xmove(L, co, 1);

create_helper函數返回之後,L的棧頂是新協程,co的棧頂是入口函數。

初始化uthread

ngx_stream_lua_coroutine_create_helper返回之後,進行uthread的初始化。

此時,父協程L是這樣的:

  • 棧頂是新創建的協程
  • 然後是參數和入口函數

在此之前,先在registry錶中保存一個該協程的ref。(到現在還沒搞明白這個ref是幹嘛用的?除了創建線程和删除線程,貌似只有檢查線程是否活著的時候會查一下這個ref,只是檢查狀態用coctx->co_status不是也能做到麼?8.12更新,之所以要把線程錨定到注册錶上,是為了防止被當成垃圾回收。這也解釋了為什麼只有線程需要錨定到注册錶上,而用戶協程不需要。因為用戶協程肯定由其父協程保留著一個引用。)

/* anchor the newly created coroutine into the Lua registry */
/* 把新創建的協程寫入Lua registry錶中 */
/* 將ngx_stream_lua_coroutines_key的地址壓入棧中 */
lua_pushlightuserdata(L, &ngx_stream_lua_coroutines_key);
/* 從registry錶中獲取協程錶 */
/* L棧: |entry_func|args|新協程|協程錶|頂|*/
lua_rawget(L, LUA_REGISTRYINDEX);
/* 將新協程壓棧 */
/* L棧: |entry_func|args|新協程|協程錶|新協程|頂|*/
lua_pushvalue(L, -2);
/* -2比特置是注册錶,為新協程創建在報錶中的索引 */
/* L棧: |entry_func|args|新協程|協程錶|頂| */
coctx->co_ref = luaL_ref(L, -2); //
/* 彈出協程錶 */
/* L棧: |entry_func|args|新協程|頂| */
lua_pop(L, 1);

接下來是初始化運行環境:

此時的,L的棧情况如下:

 |entry_func|參數1|...|參數n|新協程|
1 2 ... -2 -1
if (n > 1) {
/* 由於lua函數壓棧順序是從左到右
* 因此1就是壓入的第一個參數,而spawn的第一個參數就是入口函數
* 把棧頂元素(即新協程)移動到1,覆蓋入口函數,入口函數前面已經拷貝到新協程棧上了
*/
/* L棧: |新協程|args|頂| */
lua_replace(L, 1);
/* 將參數移到新協程棧中 */
/* L棧: |新協程|頂|*/
/* co棧: |入口函數|args|頂| */
lua_xmove(L, coctx->co, n - 1);
}

設置狀態,將父協程放入post_thread隊列中,設置協程的父子關系,設置新協程為下一個調度的線程

/* 設置狀態 */
coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
ctx->co_op = NGX_STREAM_LUA_USER_THREAD_RESUME;
ctx->cur_co_ctx->thread_spawn_yielded = 1;
/* 將父協程放入post_thread隊列中 */
ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx)
/* 保存子線程的父協程上下文為當前協程 */
coctx->parent_co_ctx = ctx->cur_co_ctx;
/* 切換當前協程為新創建的協程 */
ctx->cur_co_ctx = coctx;

最後,spawn函數的返回值是新創建的協程

/* 將原協程的執行權切換出去,這裏的參數1錶示棧上留了一個值,這裏是指新創建的協程
* 主線程並不會取這個值,而是等到新線程spawn返回時作為返回值。
* 此時L棧中是新協程,co棧中是參數和入口函數。
*/
return lua_yield(L, 1);

用戶創建的coroutine

OR替換了原生的coroutine接口,當存在getfenv(0).__ngx_req時(全局環境保存了nginx請求),使用重寫後的coroutine接口函數。

coroutine.create()創建新協程部分跟uthread是一樣的,都是調用ngx_stream_lua_coroutine_create_helper()。Lua函數返回新協程。此時新協程棧中是入口函數。

coroutine.resume()用於開始或恢複新協程,其對應的C函數是ngx_http_lua_coroutine_resume()

/* 首先,獲取到協程 */
/* L棧: |co|參數|, co棧: |入口函數| */
co = lua_tothread(L, 1);
/* 然後設置狀態和父子關系 */
/* 父協程為normal */
p_coctx->co_status = NGX_HTTP_LUA_CO_NORMAL;
coctx->parent_co_ctx = p_coctx;
dd("set coroutine to running");
/* 子協程為running */
coctx->co_status = NGX_HTTP_LUA_CO_RUNNING;
/* 設置co_op告知主線程yield類型 */
ctx->co_op = NGX_HTTP_LUA_USER_CORO_RESUME;
/* 設置下一個調度協程為新協程 */
ctx->cur_co_ctx = coctx;

接下來,將控制權交還給主協程,並把參數傳給主線程。

/* 此時L棧: |co|參數|, co棧: |入口函數| */
/* lua_gettop(L) - 1錶示留在棧中的返回值個數,
* 由主線程取用之後,在lua_resume新協程時傳遞 */
/* 减一個,錶示不傳底下的co */
return lua_yield(L, lua_gettop(L) - 1);

協程執行和恢複

OR中協程的執行和恢複總是由主線程來進行,不管是coroutine.resume()還是ngx.thread.spawn(),都是先lua_yield()回到主線程之後,在主線程中lua_resume()

注意到前面創建階段,thread是lua_yield(L, 1),coroutine是lua_yield(L, lua_gettop(L) - 1)。yield到主線程之後,我們繼續看調度程序的處理。

uthread

先獲取參數個數

/* 因為入口函數和參數已經在新線程棧中了,所以從新協程中獲取參數個數,-1是除掉入口函數 */
nrets = lua_gettop(ctx->cur_co_ctx->co) - 1;

然後跳到主循環的前面,執行新線程

/* 保存新協程coctx */
orig_coctx = ctx->cur_co_ctx;
/* 執行新線程,其中nrets為參數個數 */
rv = lua_resume(orig_coctx->co, nrets);

lua_resume中就會開始新線程的執行。當新線程執行完畢或因I/O中斷yield之後,會恢複父協程。在恢複父協程之前,先設置參數個數為1,即之前留在棧上的新協程co。恢複父協程之後,ngx.thread.spawn()函數就返回了。

if (ctx->cur_co_ctx->thread_spawn_yielded) {
ctx->cur_co_ctx->thread_spawn_yielded = 0;
nrets = 1;
}

coroutine

同樣是先獲取參數個數

/* 獲取父協程 */
old_co = ctx->cur_co_ctx->parent_co_ctx->co;
/* 因為參數還在父協程棧中,所以從父協程棧中獲取參數個數 */
nrets = lua_gettop(old_co);
if (nrets) {
/* 將參數從父協程移到子協程 */
lua_xmove(old_co, ctx->cur_co_ctx->co, nrets);
}

此時子協程棧中是參數和入口函數。

然後跳到主循環的前面,執行新協程,跟前面uthread時一樣。

協程掛起

協程的掛起分為兩種情况:

  • 一種是內部在I/O等待時自動掛起,這種情况用戶不用參與,OR會自動將相應的事件及其handler掛到事件驅動上,當事件被喚醒時繼續未完成的I/O操作,完成之後由調度器恢複之前掛起的協程。
  • 另一種是用戶在Lua代碼主動調用coroutine.yield()掛起。此時由調度器根據情况决定執行下一個執行的協程。

顯式主動掛起

我們先來看用戶主動掛起的情况,coroutine.yield()對應的C函數為ngx_stream_lua_coroutine_yield()。我們先來看看它裏面幹了些什麼。

/* 首先修改當前協程的狀態為掛起 */
coctx = ctx->cur_co_ctx;
coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;
/* 設置co_op */
ctx->co_op = NGX_STREAM_LUA_USER_CORO_YIELD;
/* 如果不是用戶線程(也即是普通coroutine),且有父協程,
將其父協程狀態設置為running */
if (!coctx->is_uthread && coctx->parent_co_ctx) {
coctx->parent_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
}
/* 最後將控制權交還主線程,將所有yield參數傳遞給主線程 */
return lua_yield(L, lua_gettop(L));

回到主線程之後,根據待掛起協程是thread還是corotine進行不同處理。

thread

if (ngx_stream_lua_is_thread(ctx)) {
/* 丟弃coroutine.yield()的任何參數 */
lua_settop(ctx->cur_co_ctx->co, 0);
/* 因為thread由調度器負責調度,所以將當前線程的狀態改為running,為什麼不在前面一起改?*/
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
/* 如果已經有pending的線程,則放到隊列中 */
if (ctx->posted_threads) {
ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx);
ctx->cur_co_ctx = NULL;
return NULL;
}
/* 否則,立即恢複線程 */
}

coroutine

/* 獲取當前棧的高度,也即coroutine.yield()的參數個數 */
nrets = lua_gettop(ctx->cur_co_ctx->co);
/* 設置父協程為下一個調度的協程 */
next_coctx = ctx->cur_co_ctx->parent_co_ctx;
next_co = next_coctx->co;
/* 將參數從子協程棧中移到父協程棧中 */
if (nrets) {
dd("moving %d return values to next co", nrets);
lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
#ifdef NGX_LUA_USE_ASSERT
ctx->cur_co_ctx->co_top -= nrets;
#endif
}
/* 如果不是wrap封裝的,還要加一個true,作為第一個參數 */
if (!ctx->cur_co_ctx->is_wrap) {
/* prepare return values for coroutine.resume
* (true plus any retvals)
*/
lua_pushboolean(next_co, 1);
/* 插入1的比特置,作為第一個參數 */
lua_insert(next_co, 1);
nrets++; /* add the true boolean value */
}
ctx->cur_co_ctx = next_coctx;
/* 回到主循環的前面,resume父協程 */
break;

I/O等待場景

I/O等待的場景有很多,不過其背後的原理都差不多:

  • 定義一個事件,設置恢複時的handler及對應協程上下文,然後lua_yield()回到run_thread()
  • 主線程將ctx->cur_co_ctx設為空之後,直接返回NGX_AGAIN,如果有posted_thread會繼續執行,否則將控制權交還給nginx層
  • 後續當事件發生時,繼續未完成的操作,完成之後將保存的協程上下文設為ctx->cur_co_ctx,然後調用ngx_stream_lua_run_thread()恢複協程的執行。

這裏舉兩個典型的例子:

ngx.sleep()

它的C函數實現是ngx_stream_lua_ngx_sleep(),先定義設置好handler和coctx,掛上定時器,然後lua_yield()

 ngx_stream_lua_cleanup_pending_operation(coctx);
coctx->cleanup = ngx_stream_lua_sleep_cleanup;
coctx->data = r;
/* 保存恢複時的handler和協程上下文 */
coctx->sleep.handler = ngx_stream_lua_sleep_handler;
coctx->sleep.data = coctx;
coctx->sleep.log = r->connection->log;
/* 當delay為0時,放入post_event隊列或添加定時器 */
if (delay == 0) {
#ifdef HAVE_POSTED_DELAYED_EVENTS_PATCH
dd("posting 0 sec sleep event to head of delayed queue");
coctx->sleep.delayed = 1;
ngx_post_event(&coctx->sleep, &ngx_posted_delayed_events);
#else
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "ngx.sleep(0)"
" called without delayed events patch, this will"
" hurt performance");
ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
#endif
} else { /* 添加定時器 */
ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
}
/* 外層函數*/
return lua_yield(L, 0);

run_thread()裏將當前協程上下文置為NULL,然後返回NGX_AGAIN

by_chunk()裏會先檢查有沒有在post隊列裏的線程,如果沒有則返回

 rc = ngx_stream_lua_run_thread(L, r, ctx, 0);
if (rc == NGX_ERROR || rc >= NGX_OK) {
/* do nothing */
} else if (rc == NGX_AGAIN) {
rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 0);
} else if (rc == NGX_DONE) { /* 這裏DONE的情况只有HTTP子請求的時候會出現 */
rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 1);
} else {
rc = NGX_OK;
}

當定時器超時時,它會執行sleep_handler(),設置ctx->cur_co_ctx然後執行run_thread()恢複協程調度。

ngx.tcp.receive()

其對應的C函數實現是ngx_stream_lua_socket_tcp_receive(),裏面會調ngx_stream_lua_socket_tcp_receive_helper()。碰到讀等待的情况,也是先設置好handler和coctx,然後lua_yield()。我們來看下裏面代碼:

 /* 這裏0錶示還未進行協程切換 */
u->read_waiting = 0;
u->read_co_ctx = NULL;
/* 讀取的主要邏輯由此函數處理 */
rc = ngx_stream_lua_socket_tcp_read(r, u);
/* 不管是成功、出錯或等待I/O,肯定會返回 */
if(rc == NGX_ERROR) {
/*...*/
}
if(rc == NGX_OK) {
/*...*/
}
/* rc == NGX_AGAIN */
/* 如果是等待I/O的情况,設置事件觸發時的handler、當前協程上下文 */
u->read_event_handler = ngx_stream_lua_socket_read_handler;
coctx = lctx->cur_co_ctx;
/* 設置請求的寫事件handler,這個是返回到Lua層前調用的handler */
r->write_event_handler = ngx_stream_lua_content_wev_handler;
/* 保存當前協程上下文到u上 */
u->read_co_ctx = coctx;
/* 錶示是後續是需要協程恢複的 */
u->read_waiting = 1;
/* 設置准備返回值的回調 */
u->read_prepare_retvals = ngx_stream_lua_socket_tcp_receive_retval_handler;
return lua_yield(L, 0);

回到run_thread(),同樣是將當前協程上下文置為NULL,然後返回NGX_AGAIN

當事件被觸發時,執行前面設置的ngx_stream_lua_socket_read_handler(),裏面又會調用讀取操作核心函數ngx_stream_lua_socket_tcp_read()。如果繼續碰到等待I/O,handler直接結束,等待下一次事件。如果是完成或出錯,會執行如下操作:

/* 恢複該值為0 */
u->read_waiting = 0;
/* 獲取協程上下文 */
coctx = u->read_co_ctx;
/* 設置協程恢複的handler */
ctx->resume_handler = ngx_stream_lua_socket_tcp_read_resume;
/* 設置下一個調度的上下文,為之前調用讀取操作的協程 */
ctx->cur_co_ctx = coctx;
/* 這個handler就是yield之前設置的那個,它裏面調用 ctx->resume_handler */
r->write_event_handler(r);

r->write_event_handler(r);是返回Lua層前調用的handler,裏面會調用resume_handlerngx_stream_lua_socket_tcp_read_resume()只是封裝了一下,最終都是調用的ngx_stream_lua_socket_tcp_resume_helper(),我們看來下它的代碼:

/* 待恢複協程上下文 */
coctx = ctx->cur_co_ctx;
u = coctx->data;
prepare_retvals = u->read_prepare_retvals;
/* 准備返回值 */
nret = prepare_retvals(r, u, ctx->cur_co_ctx->co);
/* 恢複協程調度,回到Lua層 */
rc = ngx_stream_lua_run_thread(vm, r, ctx, nret);

至於完成的條件,取决與不同的調用方式。如果是讀取固定字節數的話,會維護一個剩餘待讀取的字節數u->rest。如果是讀取一行,則讀取到\n就結束。如果是readall,則一直讀到u->eof為止。

協程執行完畢

為了不失完整性,再說一下正常結束和出錯時的情况。正常執行完畢時,會設置協程狀態,然後清理它的僵屍子線程:

/* 將當前協程狀態置為DEAD */
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_DEAD;
/* 如果子線程有僵屍線程,則清理之 */
if (ctx->cur_co_ctx->zombie_child_threads) {
ngx_stream_lua_cleanup_zombie_child_uthreads(
r, L, ctx, ctx->cur_co_ctx);
}

接下來,根據結束的協程的類型不同執行不同的操作:

入口線程

此時直接删除線程即可,然後根據是否還有用戶線程,選擇返回NGX_AGAINNGX_OK

if (ngx_stream_lua_is_entry_thread(ctx)) {
/* 將虛擬機棧清空 */
lua_settop(L, 0);
/* 删除當前線程,會從REGISTY錶中解引用當前協程的`coctx->co_ref` */
ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
/* 如果還有其他用戶線程,返回NGX_AGAIN */
if (ctx->uthreads) {
ctx->cur_co_ctx = NULL;
return NGX_AGAIN;
}
/* all user threads terminated already */
goto done; /* 到這就圓滿結束了 return NGX_OK; */
}

用戶線程

此時如果父協程已經死了,處理方式跟入口線程一樣,即删除線程,然後根據是否還有任何用戶線程或入口線程,選擇返回NGX_AGAINNGX_OK

如果父協程還活著,並且已經在wait它了,直接恢複父協程。否則,加入到父協程的僵屍線程列錶中。

if (ctx->cur_co_ctx->is_uthread) {
/* 清空虛擬機棧 */
lua_settop(L, 0);
/* 獲取父協程 */
parent_coctx = ctx->cur_co_ctx->parent_co_ctx;
/* 如果父協程還活著 */
if (ngx_stream_lua_coroutine_alive(parent_coctx)) {
/* 並且在wait當前線程,則恢複父協程 */
if (ctx->cur_co_ctx->waited_by_parent) {
ngx_stream_lua_probe_info("parent already waiting");
ctx->cur_co_ctx->waited_by_parent = 0;
success = 1;
goto user_co_done;
}
/* 否則將當前線程掛到父協程的僵屍子線程中 */
if (ngx_stream_lua_post_zombie_thread(r, parent_coctx,
ctx->cur_co_ctx)
!= NGX_OK)
{
return NGX_ERROR;
}
/* 壓入第一個返回值true,以備後續wait時返回 */
lua_pushboolean(ctx->cur_co_ctx->co, 1);
lua_insert(ctx->cur_co_ctx->co, 1);
/* 設置當前線程狀態為ZOMBIE */
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_ZOMBIE;
ctx->cur_co_ctx = NULL;
return NGX_AGAIN; /* 返回上層 */
}
/* 如果父協程已經死了,直接删除當前線程
* 會從REGISTY錶中解引用當前協程的`coctx->co_ref` */
ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
ctx->uthreads--;
/* 如果沒有用戶線程了 */
if (ctx->uthreads == 0) {
/* 入口線程在活著,返回上層 */
if (ngx_stream_lua_entry_thread_alive(ctx)) {
ctx->cur_co_ctx = NULL;
return NGX_AGAIN;
}
/* all threads terminated already */
goto done; /* 到這就圓滿結束了 return NGX_OK; */
}
/* 如果還有其他用戶線程,返回上層 */
ctx->cur_co_ctx = NULL;
return NGX_AGAIN;
}

用戶協程

剩下的就是用戶協程的情况,這個情况跟用戶線程被父協程wait的情况是一樣的。主要是將返回值移動到父協程棧中,然後跳到主循環前面恢複父協程的執行。

success = 1;
/* 獲取返回值個數 */
nrets = lua_gettop(ctx->cur_co_ctx->co);
next_coctx = ctx->cur_co_ctx->parent_co_ctx;
next_co = next_coctx->co;
/* 將返回值移到父協程棧中 */
if (nrets) {
lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
}
/* 如果是用戶線程,删除之 */
if (ctx->cur_co_ctx->is_uthread) {
ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
ctx->uthreads--;
}
/* 除了wrap的用戶協程,加上第一個true的返回值 */
if (!ctx->cur_co_ctx->is_wrap) {
/* ended successfully, coroutine.resume returns true plus
* any return values
*/
lua_pushboolean(next_co, success);
lua_insert(next_co, 1);
nrets++;
}
/* 設置父協程的狀態為RUNNING */
ctx->cur_co_ctx = next_coctx;
next_coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
/* 回到主循環前面,恢複父協程的執行 */
continue;

出錯的情况

大致處理步驟是,恢複cur_co_ctx,獲取虛擬機L棧上錯誤信息,獲取當前協程棧中錯誤信息,後面的操作類似協程執行完畢時,根據不同的情况選擇恢複父協程或者返回上層。

/* 恢複cur_co_ctx */
if (ctx->cur_co_ctx != orig_coctx) {
ctx->cur_co_ctx = orig_coctx;
}
/* 設置當前協程狀態為DEAD */
ctx->cur_co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD;
/* 獲取錯誤信息 */
if (orig_coctx->is_uthread
|| orig_coctx->is_wrap
|| ngx_http_lua_is_entry_thread(ctx))
{
ngx_http_lua_thread_traceback(L, orig_coctx->co, orig_coctx);
trace = lua_tostring(L, -1);
if (lua_isstring(orig_coctx->co, -1)) {
msg = lua_tostring(orig_coctx->co, -1);
dd("user custom error msg: %s", msg);
} else {
msg = "unknown reason";
}
}

用戶線程

跟正常結束的處理一樣,除了第一個返回值是false。

此時如果父協程已經死了,直接删除線程,然後根據是否還有任何用戶線程或入口線程,選擇返回NGX_AGAINNGX_OK

如果父協程還活著,並且已經在wait它了,直接恢複父協程。否則,加入到父協程的僵屍線程列錶中。

入口線程

ngx_stream_lua_request_cleanup()清理當前請求,裏面會清理掉所有的用戶創建的協程,然後清理入口協程自己。最後返回錯誤碼。

用戶協程

如果是wrap的協程,將錯誤傳遞給父協程(就好像是父協程出錯了,然後父協程重新走一遍上面的出錯處理流程)。

如果是普通協程,則恢複父協程的執行,返回false和錯誤信息。

參考資料

本博客已經遷移至CatBro's Blog,那裏是我自己搭建的個人博客,頁面效果比這邊更好,支持站內搜索,評論回複還支持郵件提醒,歡迎關注。這邊只會在有時間的時候不定期搬運一下。

本篇文章鏈接

版权声明:本文为[猫猫哥]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/08/20210815235927056b.html