关于 redis lua 的使用大家都不陌生,应用场景需要把复杂逻辑的原子性,比如计数器,分布式锁。见过没用 lua 实现的锁,不出 bug 也算是神奇

好奇实现的细节,阅读了几个版本,本文源码展示为 3.2 版本, 7.0 重构比较多,看着干净一些

一致性

1
2
3
4
5
6
redis> EVAL "return { KEYS[1], KEYS[2], ARGV[1], ARGV[2], ARGV[3] }" 2 key1 key2 arg1 arg2 arg3
1) "key1"
2) "key2"
3) "arg1"
4) "arg2"
5) "arg3"

上面是简单的测试用例,其中 2 表示紧随其后的两个参数是 key, 我们通过 KEYS[i] 来获取,后面的是参数,通过 ARGV[i] 获取。我司历史上遇到过一次 redis 主从数据不一致的情况,原因比较简单:

Lua 脚本需要 hgetall 拿到所有数据,但是依赖顺序,恰好此时底层结构 master 己经变成了 hashtable , 但是 slave 还是 ziplist , 获取到的第一个数据当成 key 去做其它逻辑,导致主从不一致发生

引出使用 redis lua 最佳实践之一: 无论单机还是集群模式,对于 key 的操作必须通过参数列表,显示的传进去,而不能依赖脚本或是随机逻辑

结论其实显而易见,会有数据不一致的风险,同时对于 cluster 模式,要求所有 keys 所在的 slot 必须在同一个 shard 内,这个检测是在 smart client 或者是 cluster proxy 端

导致问题的原因在于,redis 旧版本同步时,本质上还是直接执行的 lua 脚本,这种模式叫做 verbatim replication . 如果只同步 lua 脚本修改的内容可以避免这类 issue, 类似于 mysql binlog 的 SQL 模式和 ROW 模式的区别(也不完全一样)

实际上 redis 也是这么做的,3.2 版本引入 redis.replicate_commands() , 只同步变更的内容,称为 effects replication 模式。5.0 lua 默认为该模式,在 7.0 中移除了旧版本的 verbatim replication 的支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int luaRedisGenericCommand(lua_State *lua, int raise_error) {
......
/* If we are using single commands replication, we need to wrap what
* we propagate into a MULTI/EXEC block, so that it will be atomic like
* a Lua script in the context of AOF and slaves. */
if (server.lua_replicate_commands &&
!server.lua_multi_emitted &&
server.lua_write_dirty &&
server.lua_repl != PROPAGATE_NONE)
{
execCommandPropagateMulti(server.lua_caller);
server.lua_multi_emitted = 1;
}

/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (server.lua_replicate_commands) {
/* Set flags according to redis.set_repl() settings. */
if (server.lua_repl & PROPAGATE_AOF)
call_flags |= CMD_CALL_PROPAGATE_AOF;
if (server.lua_repl & PROPAGATE_REPL)
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c, call_flags);
......
}

3.2 版本中,当 lua 虚拟机执行 redis.call 或者 redis.pcall 时调用 luaRedisGenericCommand , 如果开启了 lua_replicate_commands 选项,那么生成一个 multi 事务命令用于复制

同时 call 去真正执行命令时,call_flags 打上 CMD_CALL_PROPAGATE_AOF CMD_CALL_PROPAGATE_REPL 标签,执行命令时生成同步命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void evalGenericCommand(client *c, int evalsha) {
......
/* If we are using single commands replication, emit EXEC if there
* was at least a write. */
if (server.lua_replicate_commands) {
preventCommandPropagation(c);
if (server.lua_multi_emitted) {
robj *propargv[1];
propargv[0] = createStringObject("EXEC",4);
alsoPropagate(server.execCommand,c->db->id,propargv,1,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(propargv[0]);
}
}
......
}

略去无关代码, evalGenericCommand 函数最后判断,如果处于 effects replication 模式,那么只通过事务去执行产生的命令,而不是同步 lua 脚本,生成一个 exec 命令

另外为了保证 deterministic 确定性,redis lua 做了以下事情:

  • redis lua 不允许获取系统时间或者外部状态
  • 修改了伪随机函数 math.random , 使用同一个种子,使得每次获取得到随机序列是一样的(除非指定了 math.randomseed)
  • 有些命令返回的结果是没有排序的,比如 SMEMBERS , 4.0 版本 redids lua 会额外的做一次排序再返回。但是 5.0 后去掉了这个排序,因为前面提到的 effects replication 避免了这个问题,但是使用时不要假设有任何排序,是否排序要看普通命令的文档说明
  • 当用户的脚本调用 RANDOMKEY , SRANDMEMBER , TIME 随机命令后,尝试去修改数据库,会报错。但是只读的 lua 脚本可以调用这些 non-deterinistic 命令
  • 缓存

    一般我们用 eval 命令执行 lua 脚本内容,但是对于高频执行的脚本,每次都要从文本中解析生成 function 开销会很高,所以引入了 evalsha 命令

    1
    2
    > script load "redis.call('incr', KEYS[1])"
    "072f8f1f3cac2bbf3017c4af7c73e742aa085ac5"

    先调用 script load 生成对应脚本的 hash 值,每次执行时只需要传入 hash 值即可

    1
    2
    > EVALSHA da0bf4095ef4b6f337f03ba9dcd326dbc5fc8ace 1 testkey
    (nil)

    对于 failover, 或第一次执行时 redis 不存在该 lua 函数则报错

    1
    2
    > EVALSHA da0bf4095ef4b6f337f03ba9dcd326dbc5fc8aca 1 testkey
    (error) NOSCRIPT No matching script. Please use EVAL.

    所以,我们在封装 redis client 时要处理异常情况

  • Client 初始化计算 sha 值后,直接 evalsha 调用脚本
  • 如果失败,返回错误是 NOSCRIPT , 再调用 SCRIPT LOAD 创建 lua 函数,Client 再正常调用 evalsha
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    void scriptCommand(client *c) {
    ......
    } else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"load")) {
    char funcname[43];
    sds sha;

    funcname[0] = 'f';
    funcname[1] = '_';
    sha1hex(funcname+2,c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
    sha = sdsnewlen(funcname+2,40);
    if (dictFind(server.lua_scripts,sha) == NULL) {
    if (luaCreateFunction(c,server.lua,funcname,c->argv[2])
    == C_ERR) {
    sdsfree(sha);
    return;
    }
    }
    addReplyBulkCBuffer(c,funcname+2,40);
    sdsfree(sha);
    forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
    ......
    }

    命令入口函数 scriptCommand LOAD 名字很不直观,以为是个只读命令,但实际上做了很多事情:

  • 计算 sha 值
  • luaCreateFunction 创建运行时函数
  • forceCommandPropagation 设置 flag 参数用于复制到从库或者 AOF
  • Lua 源码走读

    初始化

    初始化只需看 scriptingInit 函数,主要功能是加载 lua 库(cjson, table, string, math …), 移除不支除的函数(loadfile, dofile), 注册我们常用的命令表到 lua table 中(call, pcall, log, math, random …), 最后创建虚拟的 redisClient 用于执行命令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    void scriptingInit(int setup) {
    lua_State *lua = lua_open();

    if (setup) {
    server.lua_client = NULL;
    server.lua_caller = NULL;
    server.lua_timedout = 0;
    server.lua_always_replicate_commands = 0; /* Only DEBUG can change it.*/
    ldbInit();
    }

    luaLoadLibraries(lua);
    luaRemoveUnsupportedFunctions(lua);

    /* Initialize a dictionary we use to map SHAs to scripts.
    * This is useful for replication, as we need to replicate EVALSHA
    * as EVAL, so we need to remember the associated script. */
    server.lua_scripts = dictCreate(&shaScriptObjectDictType,NULL);

    /* Register the redis commands table and fields */
    lua_newtable(lua);

    /* redis.call */
    lua_pushstring(lua,"call");
    lua_pushcfunction(lua,luaRedisCallCommand);
    lua_settable(lua,-3);
    ......
    }

    这里面涉及 c 如何与 lua 语言交互,如何互相调用的问题,不用深究用到了再学即可

    lua_newtable(lua); 创建 lua table 并入栈,此时位置是 -1

    lua_pushstring(lua,"call"); 入栈字符串 call

    lua_pushcfunction(lua,luaRedisCallCommand); 入栈函数 luaRedisCallCommand

    lua_settable(lua,-3); 生成命令表,此时 table 位置是 -3,然后一次从栈中弹出,即伪代码为 table["call"] = luaRedisCallCommand

    1
    eval "redis.call('incr', KEYS[1])" 1 testkey

    这也就是为什么我们的 lua 脚本可以执行 redis 命令的原因,函数查表去执行。其它命令也同理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /* Replace math.random and math.randomseed with our implementations. */
    lua_getglobal(lua,"math");

    lua_pushstring(lua,"random");
    lua_pushcfunction(lua,redis_math_random);
    lua_settable(lua,-3);

    lua_pushstring(lua,"randomseed");
    lua_pushcfunction(lua,redis_math_randomseed);
    lua_settable(lua,-3);

    lua_setglobal(lua,"math");

    这里也看到同时修改了 random 函数行为

    执行命令

    eval 函数总入口是 evalCommand , 这里参考 3.2 源码,非 debug 模式下执行调用 evalGenericCommand , 函数比较长,主要分三大块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    void evalGenericCommand(client *c, int evalsha) {
    lua_State *lua = server.lua;
    char funcname[43];
    long long numkeys;
    int delhook = 0, err;

    /* When we replicate whole scripts, we want the same PRNG sequence at
    * every call so that our PRNG is not affected by external state. */
    redisSrand48(0);

    /* We set this flag to zero to remember that so far no random command
    * was called. This way we can allow the user to call commands like
    * SRANDMEMBER or RANDOMKEY from Lua scripts as far as no write command
    * is called (otherwise the replication and AOF would end with non
    * deterministic sequences).
    *
    * Thanks to this flag we'll raise an error every time a write command
    * is called after a random command was used. */
    server.lua_random_dirty = 0;
    server.lua_write_dirty = 0;
    server.lua_replicate_commands = server.lua_always_replicate_commands;
    server.lua_multi_emitted = 0;
    server.lua_repl = PROPAGATE_AOF|PROPAGATE_REPL;

    /* Get the number of arguments that are keys */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
    return;
    if (numkeys > (c->argc - 3)) {
    addReplyError(c,"Number of keys can't be greater than number of args");
    return;
    } else if (numkeys < 0) {
    addReplyError(c,"Number of keys can't be negative");
    return;
    }

    命令执行前的检查阶段,设置随机种子,设置一些 flag, 并检查 keys 个数是否正确

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
     /* We obtain the script SHA1, then check if this function is already
    * defined into the Lua state */
    funcname[0] = 'f';
    funcname[1] = '_';
    if (!evalsha) {
    /* Hash the code if this is an EVAL call */
    sha1hex(funcname+2,c->argv[1]->ptr,sdslen(c->argv[1]->ptr));
    } else {
    /* We already have the SHA if it is a EVALSHA */
    int j;
    char *sha = c->argv[1]->ptr;

    /* Convert to lowercase. We don't use tolower since the function
    * managed to always show up in the profiler output consuming
    * a non trivial amount of time. */
    for (j = 0; j < 40; j++)
    funcname[j+2] = (sha[j] >= 'A' && sha[j] <= 'Z') ?
    sha[j]+('a'-'A') : sha[j];
    funcname[42] = '\0';
    }

    /* Push the pcall error handler function on the stack. */
    lua_getglobal(lua, "__redis__err__handler");

    /* Try to lookup the Lua function */
    lua_getglobal(lua, funcname);
    if (lua_isnil(lua,-1)) {
    lua_pop(lua,1); /* remove the nil from the stack */
    /* Function not defined... let's define it if we have the
    * body of the function. If this is an EVALSHA call we can just
    * return an error. */
    if (evalsha) {
    lua_pop(lua,1); /* remove the error handler from the stack. */
    addReply(c, shared.noscripterr);
    return;
    }
    if (luaCreateFunction(c,lua,funcname,c->argv[1]) == C_ERR) {
    lua_pop(lua,1); /* remove the error handler from the stack. */
    /* The error is sent to the client by luaCreateFunction()
    * itself when it returns C_ERR. */
    return;
    }
    /* Now the following is guaranteed to return non nil */
    lua_getglobal(lua, funcname);
    serverAssert(!lua_isnil(lua,-1));
    }

    lua 中保存脚本 funcname 格式是 f_{evalsha hash} , 如果每一次执行,调用 luaCreateFunction 让 lua 虚拟机加载 user_script 脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    /* Populate the argv and keys table accordingly to the arguments that
    * EVAL received. */
    luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys);
    luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys);

    /* Select the right DB in the context of the Lua client */
    selectDb(server.lua_client,c->db->id);

    /* Set a hook in order to be able to stop the script execution if it
    * is running for too much time.
    * We set the hook only if the time limit is enabled as the hook will
    * make the Lua script execution slower.
    *
    * If we are debugging, we set instead a "line" hook so that the
    * debugger is call-back at every line executed by the script. */
    server.lua_caller = c;
    server.lua_time_start = mstime();
    server.lua_kill = 0;
    if (server.lua_time_limit > 0 && server.masterhost == NULL &&
    ldb.active == 0)
    {
    lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
    delhook = 1;
    } else if (ldb.active) {
    lua_sethook(server.lua,luaLdbLineHook,LUA_MASKLINE|LUA_MASKCOUNT,100000);
    delhook = 1;
    }

    /* At this point whether this script was never seen before or if it was
    * already defined, we can call it. We have zero arguments and expect
    * a single return value. */
    err = lua_pcall(lua,0,1,-2);

    /* Perform some cleanup that we need to do both on error and success. */
    if (delhook) lua_sethook(lua,NULL,0,0); /* Disable hook */
    if (server.lua_timedout) {
    server.lua_timedout = 0;
    /* Restore the readable handler that was unregistered when the
    * script timeout was detected. */
    aeCreateFileEvent(server.el,c->fd,AE_READABLE,
    readQueryFromClient,c);
    }
    server.lua_caller = NULL;

    luaSetGlobalArray KEYS , ARGS 以参数形式入栈,设置一堆 debug/slow call 相关的参数,最后 lua_pcall 执行用户脚本,lua 虚拟机执行脚本时,如果遇到 redis.call 就会回调 redis 函数 luaRedisCallCommand , 对应的 redis.pcall 执行 luaRedisPCallCommand 函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
        if (err) {
    addReplyErrorFormat(c,"Error running script (call to %s): %s\n",
    funcname, lua_tostring(lua,-1));
    lua_pop(lua,2); /* Consume the Lua reply and remove error handler. */
    } else {
    /* On success convert the Lua return value into Redis protocol, and
    * send it to * the client. */
    luaReplyToRedisReply(c,lua); /* Convert and consume the reply. */
    lua_pop(lua,1); /* Remove the error handler. */
    }

    /* If we are using single commands replication, emit EXEC if there
    * was at least a write. */
    if (server.lua_replicate_commands) {
    preventCommandPropagation(c);
    if (server.lua_multi_emitted) {
    robj *propargv[1];
    propargv[0] = createStringObject("EXEC",4);
    alsoPropagate(server.execCommand,c->db->id,propargv,1,
    PROPAGATE_AOF|PROPAGATE_REPL);
    decrRefCount(propargv[0]);
    }
    }

    ......
    if (evalsha && !server.lua_replicate_commands) {
    if (!replicationScriptCacheExists(c->argv[1]->ptr)) {
    /* This script is not in our script cache, replicate it as
    * EVAL, then add it into the script cache, as from now on
    * slaves and AOF know about it. */
    robj *script = dictFetchValue(server.lua_scripts,c->argv[1]->ptr);

    replicationScriptCacheAdd(c->argv[1]->ptr);
    serverAssertWithInfo(c,NULL,script != NULL);
    rewriteClientCommandArgument(c,0,
    resetRefCount(createStringObject("EVAL",4)));
    rewriteClientCommandArgument(c,1,script);
    forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
    }
    }
    }

    代码有点长,总体就是执行超时处理,生成 exec 用于复制,最后如果 replication 从库没有执行过这个 evlsha 脚本,并且当前模式不是 lua_always_replicate_commands 要把脚本真实内容也先同步到 replication

    这里还有最重要的是 luaReplyToRedisReply(c,lua); 将 lua 返回值,转换成 redis RESP 格式

    再来看一下 luaRedisGenericCommand 是如何调用 redis 函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    int luaRedisGenericCommand(lua_State *lua, int raise_error) {
    int j, argc = lua_gettop(lua);
    struct redisCommand *cmd;
    client *c = server.lua_client;
    sds reply;

    /* Cached across calls. */
    static robj **argv = NULL;
    static int argv_size = 0;
    static robj *cached_objects[LUA_CMD_OBJCACHE_SIZE];
    static size_t cached_objects_len[LUA_CMD_OBJCACHE_SIZE];
    static int inuse = 0; /* Recursive calls detection. */

    /* By using Lua debug hooks it is possible to trigger a recursive call
    * to luaRedisGenericCommand(), which normally should never happen.
    * To make this function reentrant is futile and makes it slower, but
    * we should at least detect such a misuse, and abort. */
    if (inuse) {
    char *recursion_warning =
    "luaRedisGenericCommand() recursive call detected. "
    "Are you doing funny stuff with Lua debug hooks?";
    serverLog(LL_WARNING,"%s",recursion_warning);
    luaPushError(lua,recursion_warning);
    return 1;
    }
    inuse++;

    /* Require at least one argument */
    if (argc == 0) {
    luaPushError(lua,
    "Please specify at least one argument for redis.call()");
    inuse--;
    return raise_error ? luaRaiseError(lua) : 1;
    }

    /* Build the arguments vector */
    if (argv_size < argc) {
    argv = zrealloc(argv,sizeof(robj*)*argc);
    argv_size = argc;
    }

    for (j = 0; j < argc; j++) {
    char *obj_s;
    size_t obj_len;
    char dbuf[64];

    if (lua_type(lua,j+1) == LUA_TNUMBER) {
    /* We can't use lua_tolstring() for number -> string conversion
    * since Lua uses a format specifier that loses precision. */
    lua_Number num = lua_tonumber(lua,j+1);

    obj_len = snprintf(dbuf,sizeof(dbuf),"%.17g",(double)num);
    obj_s = dbuf;
    } else {
    obj_s = (char*)lua_tolstring(lua,j+1,&obj_len);
    if (obj_s == NULL) break; /* Not a string. */
    }

    /* Try to use a cached object. */
    if (j < LUA_CMD_OBJCACHE_SIZE && cached_objects[j] &&
    cached_objects_len[j] >= obj_len)
    {
    sds s = cached_objects[j]->ptr;
    argv[j] = cached_objects[j];
    cached_objects[j] = NULL;
    memcpy(s,obj_s,obj_len+1);
    sdssetlen(s, obj_len);
    } else {
    argv[j] = createStringObject(obj_s, obj_len);
    }
    }

    /* Check if one of the arguments passed by the Lua script
    * is not a string or an integer (lua_isstring() return true for
    * integers as well). */
    if (j != argc) {
    j--;
    while (j >= 0) {
    decrRefCount(argv[j]);
    j--;
    }
    luaPushError(lua,
    "Lua redis() command arguments must be strings or integers");
    inuse--;
    return raise_error ? luaRaiseError(lua) : 1;
    }

    /* Setup our fake client for command execution */
    c->argv = argv;
    c->argc = argc;

    /* Log the command if debugging is active. */
    if (ldb.active && ldb.step) {
    sds cmdlog = sdsnew("<redis>");
    for (j = 0; j < c->argc; j++) {
    if (j == 10) {
    cmdlog = sdscatprintf(cmdlog," ... (%d more)",
    c->argc-j-1);
    } else {
    cmdlog = sdscatlen(cmdlog," ",1);
    cmdlog = sdscatsds(cmdlog,c->argv[j]->ptr);
    }
    }
    ldbLog(cmdlog);
    }

    这里的功能,主要是从 lua 虚拟机中获取 eval 脚本的参数,赋值给 redisClient, 为以后执行命令做准备

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /* Command lookup */
    cmd = lookupCommand(argv[0]->ptr);
    if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) ||
    (argc < -cmd->arity)))
    {
    if (cmd)
    luaPushError(lua,
    "Wrong number of args calling Redis command From Lua script");
    else
    luaPushError(lua,"Unknown Redis command called from Lua script");
    goto cleanup;
    }
    c->cmd = c->lastcmd = cmd;

    lookupCommand 查表,找到要执行的 redis 命令

    1
    2
    3
    4
    5
    /* There are commands that are not allowed inside scripts. */
    if (cmd->flags & CMD_NOSCRIPT) {
    luaPushError(lua, "This Redis command is not allowed from scripts");
    goto cleanup;
    }

    如果是不允许在 lua 中执行的命令,报错退出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    /* Write commands are forbidden against read-only slaves, or if a
    * command marked as non-deterministic was already called in the context
    * of this script. */
    if (cmd->flags & CMD_WRITE) {
    if (server.lua_random_dirty && !server.lua_replicate_commands) {
    luaPushError(lua,
    "Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
    goto cleanup;
    } else if (server.masterhost && server.repl_slave_ro &&
    !server.loading &&
    !(server.lua_caller->flags & CLIENT_MASTER))
    {
    luaPushError(lua, shared.roslaveerr->ptr);
    goto cleanup;
    } else if (server.stop_writes_on_bgsave_err &&
    server.saveparamslen > 0 &&
    server.lastbgsave_status == C_ERR)
    {
    luaPushError(lua, shared.bgsaveerr->ptr);
    goto cleanup;
    }
    }

    /* If we reached the memory limit configured via maxmemory, commands that
    * could enlarge the memory usage are not allowed, but only if this is the
    * first write in the context of this script, otherwise we can't stop
    * in the middle. */
    if (server.maxmemory && server.lua_write_dirty == 0 &&
    (cmd->flags & CMD_DENYOOM))
    {
    if (freeMemoryIfNeeded() == C_ERR) {
    luaPushError(lua, shared.oomerr->ptr);
    goto cleanup;
    }
    }

    if (cmd->flags & CMD_RANDOM) server.lua_random_dirty = 1;
    if (cmd->flags & CMD_WRITE) server.lua_write_dirty = 1;

    设置 cmd->flags

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /* If this is a Redis Cluster node, we need to make sure Lua is not
    * trying to access non-local keys, with the exception of commands
    * received from our master or when loading the AOF back in memory. */
    if (server.cluster_enabled && !server.loading &&
    !(server.lua_caller->flags & CLIENT_MASTER))
    {
    /* Duplicate relevant flags in the lua client. */
    c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
    c->flags |= server.lua_caller->flags & (CLIENT_READONLY|CLIENT_ASKING);
    if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL) !=
    server.cluster->myself)
    {
    luaPushError(lua,
    "Lua script attempted to access a non local key in a "
    "cluster node");
    goto cleanup;
    }
    }

    如果是 cluster 模式,要保证 lua 的 keys 所在的 slots 必须在本地 shard

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /* If we are using single commands replication, we need to wrap what
    * we propagate into a MULTI/EXEC block, so that it will be atomic like
    * a Lua script in the context of AOF and slaves. */
    if (server.lua_replicate_commands &&
    !server.lua_multi_emitted &&
    server.lua_write_dirty &&
    server.lua_repl != PROPAGATE_NONE)
    {
    execCommandPropagateMulti(server.lua_caller);
    server.lua_multi_emitted = 1;
    }

    如果是 effect replication 模式,生成 multi 事务命令用于复制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /* Run the command */
    int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
    if (server.lua_replicate_commands) {
    /* Set flags according to redis.set_repl() settings. */
    if (server.lua_repl & PROPAGATE_AOF)
    call_flags |= CMD_CALL_PROPAGATE_AOF;
    if (server.lua_repl & PROPAGATE_REPL)
    call_flags |= CMD_CALL_PROPAGATE_REPL;
    }
    call(c,call_flags);

    这里才去真正的执行命令, call_flags 参数用于控制是否复制,是否生成 AOF 等等

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
        /* Convert the result of the Redis command into a suitable Lua type.
    * The first thing we need is to create a single string from the client
    * output buffers. */
    if (listLength(c->reply) == 0 && c->bufpos < PROTO_REPLY_CHUNK_BYTES) {
    /* This is a fast path for the common case of a reply inside the
    * client static buffer. Don't create an SDS string but just use
    * the client buffer directly. */
    c->buf[c->bufpos] = '\0';
    reply = c->buf;
    c->bufpos = 0;
    } else {
    reply = sdsnewlen(c->buf,c->bufpos);
    c->bufpos = 0;
    while(listLength(c->reply)) {
    robj *o = listNodeValue(listFirst(c->reply));

    reply = sdscatlen(reply,o->ptr,sdslen(o->ptr));
    listDelNode(c->reply,listFirst(c->reply));
    }
    }
    if (raise_error && reply[0] != '-') raise_error = 0;
    redisProtocolToLuaType(lua,reply);
    ......
    }

    redisProtocolToLuaType 把 redis 结果转换成 lua 类型返回给 lua 虚拟机