OpenResty 中的 Redis 使用技巧

前言

  Redis 是非常流行的 NoSQL 数据库之一,因其高效、稳定、开源、数据结构丰富,深受业内钦赖。很幸运,OpenResty 也内置了对 Redis 的支持。
  在使用 OpenResty 的过程中,Redis 一直是主力存储方式之一。虽然 OpenResty 内置了 Redis 驱动,但在实际项目中,对其进行进一步的封装,能更方便的使用和管理。
  在这个过程中,遇到了很多需要注意的点,也积累了一些技巧和经验,在此总结一下。

技巧

单例模式

  单例模式是对数据连接或数据查询工具类的常用处理模式。保持数据连接的单例好处很多,最重要的一点是确保了同一连接的复用,不会被重复打开浪费资源,也方便了连接的管理和状态维护。
  在 OpenResty 中,可以通过把 resty.redis:new() 产生的对象保存到 ngx.ctx 中实现单例。同一请求可以通过 ngx.ctx 共享已打开的连接。

连接池

  很多高级语言,例如 JavaC++,都支持连接池特性。在使用完数据连接后,将连接归还给连接池,而不是关闭连接。下次连接时,会尝试复用连接池中的连接。在高并发下,能大大减少建立和断开连接的次数,从而大大的节省系统资源。
  OpenRestycosocket 也支持连接池特性,而基于 cosocketMySQLRedis 驱动也顺理成章的支持了连接池特性。
  在 OpenResty 中,在需要关闭连接时,使用 client:set_keepalive(TIMEOUT, POOL_SIZE) 代替 client:close(),即可激活连接池特性。

尽量使用 Unix 套接字

  OpenResty 内置的 Redis 驱动连接 Redis 服务器有两种方式,一种是使用 IP 和端口,一种是使用 Unix 套接字。
  如果 Redis 服务器和 OpenResty 服务在同一物理服务器上,则应优先使用 Unix 套接字模式连接。经过实际测试,使用 Unix 套接字模式相较于使用 IP 端口模式,速度能够提升 10% ~ 15%
  

数据隔离 

  Redis 经常被用于数据缓存,在一个项目中,需要缓存的数据种类很多。几十种甚至上百种,都是现实中会遇到的情况。
  Redis 在没有调用 select 命令时,会默认将数据存储到 dbindex 0 的数据库。显然这样对缓存管理是很不利的。
  想象一下这样一个场景,用户表因为一些原因需要进行批量更新,这时为了缓存的一致性,我们需要清理用户数据缓存。如果所有的缓存都混杂在一起,这显然是个令人头疼的任务。
  所以,请将不同种类的数据,存储在不同的 dbindex 中,这样不管是查询还是管理,都会很方便,我们可以毫不费力的清除某一类数据,而不影响其他数据。
  缓存数据和非缓存数据更是要隔离开来,这样才不会在后续的维护工作中给自己带来麻烦。
  当然,这里需要一个技巧,否则每次查询之前都需要执行一次 select 命令,这可不是什么好主意。我们可以利用 Redis 中的pipeline 特性,将 select 和我们要执行的命令打包在一起一次发送。

编码存储

  默认情况下,Redis 只能存储 numberstringbool,而不能存储 table,这显然不符合我们的预期。
  这也很容易解决,对 table 进行编码,变成 string 自然就可以存储了。当然,为了兼容性,我们需要对所有存进去和取出来的值进行判断,以确定是不是需要编码解码,这会带来一些额外的性能消耗。
  但可以直接保存 table 实在是一件很美妙的事,付出一些额外的代价也是很值得的。
  关于编码方式,拥有的选择实在不多,考虑到 cjson 是内置模块,且 json 可读性较高,虽然编码效率并不尽人意,但是我还是选择了使用 json 来编码。

重试

  理论上来说,为了防止数据被重复处理,所有的错误都应该被抛出。但在实际使用中,一些网络错误导致的执行查询失败,应该进行重试,这样在一些短暂可恢复的故障中,用户是无感的。
  重试的针对的错误应该明确而最小化,确保不会因为重试导致数据污染。重试应该有较小的次数上限,让不能恢复的故障能够被尽快抛出。
  重试应该被加入到日志和统计数据中,通过查询分析这些数据,能够及时察觉数据库在稳定性和可靠性上的隐患。

示例代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
local json = loadMod("cjson")
local redis = loadMod("resty.redis")
local util = loadMod("core.util")
local exception = loadMod("core.exception")
local counter = loadMod("core.counter")
local dbConf = loadMod("config.redis")

--- 需要重试的错误对照表
local RETRY_ERRMSG_MAP = {
    ["broken pipe"] = true,
    ["timeout"] = true,
    ["closed"] = true,
}

--- Redis工具类
local Redis = {}

--- 获取连接
--
-- @param boolean reset 重置(强制重连)
-- @return resty.redis Redis连接
-- @return string      错误信息
local function getClient(reset)
    local client = ngx.ctx[Redis]

    if reset or not client then
        if client then
            client:close()
        end

        -- 新建连接
        local errmsg
        client, errmsg = redis:new()

        if not client then
            return nil, errmsg
        end

        -- 设置超时
        client:set_timeout(dbConf.TIMEOUT)

        -- 连接服务器
        local ok, errmsg
        local options = {}

        if dbConf.SOCK then
            ok, errmsg = client:connect("unix:" .. dbConf.SOCK, options)
        else
            ok, errmsg = client:connect(dbConf.HOST, dbConf.PORT, options)
        end

        if not ok then
            return nil, errmsg
        end

        ngx.ctx[Redis] = client
    end

    return client
end

--- 关闭连接
local function closeClient()
    local client = ngx.ctx[Redis]

    if client then
        client:set_keepalive(dbConf.TIMEOUT, dbConf.POOL_SIZE)
        ngx.ctx[Redis] = nil
    end
end

--- 转化null为nil
--
-- @param mixed value
-- @return mixed
local function nul2nil(value)
    if value == ngx.null then
        return nil
    end

    return value
end

--- 将任意值编码为格式字符串
--
-- @param mixed value
-- @return string
local function encode(value)
    if util:isNumber(value) then
        return value
    else
        json.encode_sparse_array(true)
        return "*" .. json.encode(value)
    end
end

--- 将格式字符串解码为值
--
-- @param string value
-- @return mixed
local function decode(value)
    if nul2nil(value) == nil then
        return nil
    end

    if util:isNumber(value) then
        return value
    else
        local flag = value:sub(1, 1)

        if flag == "*" then
            return json.decode(value:sub(2))
        end

        return value
    end
end

--- 执行命令
--
-- @param string cmd 命令
-- @param mixed ... 命令参数
-- @return mixed 命令结果
function Redis:execute(cmd, ...)
    counter:set(counter.COUNTER_REDIS_COMMAND)

    local client, results, errmsg

    for i = 1, dbConf.RETRY_TIMES do
        client, errmsg = getClient(i > 1)

        if client then
            if cmd == "select" or not client[cmd] then
                exception:raise("core.badCall", { cmd = cmd, args = { ... } })
            end

            client:init_pipeline()
            client:select(self.dbIndex)
            client[cmd](client, ...)

            results, errmsg = client:commit_pipeline()
        end

        if results or not RETRY_ERRMSG_MAP[errmsg] then
            break
        end
    end

    if not results or not util:isTable(results) or #results ~= 2 then
        exception:raise("core.queryFailed", { args = { ... }, message = errmsg })
    end

    local selectRet, cmdRet = unpack(results)

    if not selectRet or (util:isTable(selectRet) and not selectRet[1]) then
        exception:raise("core.queryFailed", { cmd = "select", args = { self.dbIndex }, message = selectRet[2] })
    end

    if not cmdRet then
        exception:raise("core.queryFailed", { cmd = cmd, args = { ... }, message = cmdRet[2] })
    end

    return cmdRet
end


--- 实例工厂
local Module = { instances = {} }

--- 获取查询对象实例
--
-- @param number dbIndex 数据库索引
-- @return table 查询对象
function Module:getInstance(dbIndex)
    if not self.instances[dbIndex] then
        self.instances[dbIndex] = util:inherit({ dbIndex = dbIndex }, Redis)
    end

    return self.instances[dbIndex]
end

--- 关闭连接
function Module:close()
    closeClient()
end

return Module