|   1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
 | local json = loadMod("cjson")
local redis = loadMod("resty.redis")
local util = loadMod("core.util")
local exception = loadMod("core.exception")
local counter = loadMod("core.counter")
local dbConf = loadMod("config.redis")
--- 需要重试的错误对照表
local RETRY_ERRMSG_MAP = {
    ["broken pipe"] = true,
    ["timeout"] = true,
    ["closed"] = true,
}
--- Redis工具类
local Redis = {}
--- 获取连接
--
-- @param boolean reset 重置(强制重连)
-- @return resty.redis Redis连接
-- @return string      错误信息
local function getClient(reset)
    local client = ngx.ctx[Redis]
    if reset or not client then
        if client then
            client:close()
        end
        -- 新建连接
        local errmsg
        client, errmsg = redis:new()
        if not client then
            return nil, errmsg
        end
        -- 设置超时
        client:set_timeout(dbConf.TIMEOUT)
        -- 连接服务器
        local ok, errmsg
        local options = {}
        if dbConf.SOCK then
            ok, errmsg = client:connect("unix:" .. dbConf.SOCK, options)
        else
            ok, errmsg = client:connect(dbConf.HOST, dbConf.PORT, options)
        end
        if not ok then
            return nil, errmsg
        end
        ngx.ctx[Redis] = client
    end
    return client
end
--- 关闭连接
local function closeClient()
    local client = ngx.ctx[Redis]
    if client then
        client:set_keepalive(dbConf.TIMEOUT, dbConf.POOL_SIZE)
        ngx.ctx[Redis] = nil
    end
end
--- 转化null为nil
--
-- @param mixed value
-- @return mixed
local function nul2nil(value)
    if value == ngx.null then
        return nil
    end
    return value
end
--- 将任意值编码为格式字符串
--
-- @param mixed value
-- @return string
local function encode(value)
    if util:isNumber(value) then
        return value
    else
        json.encode_sparse_array(true)
        return "*" .. json.encode(value)
    end
end
--- 将格式字符串解码为值
--
-- @param string value
-- @return mixed
local function decode(value)
    if nul2nil(value) == nil then
        return nil
    end
    if util:isNumber(value) then
        return value
    else
        local flag = value:sub(1, 1)
        if flag == "*" then
            return json.decode(value:sub(2))
        end
        return value
    end
end
--- 执行命令
--
-- @param string cmd 命令
-- @param mixed ... 命令参数
-- @return mixed 命令结果
function Redis:execute(cmd, ...)
    counter:set(counter.COUNTER_REDIS_COMMAND)
    local client, results, errmsg
    for i = 1, dbConf.RETRY_TIMES do
        client, errmsg = getClient(i > 1)
        if client then
            if cmd == "select" or not client[cmd] then
                exception:raise("core.badCall", { cmd = cmd, args = { ... } })
            end
            client:init_pipeline()
            client:select(self.dbIndex)
            client[cmd](client, ...)
            results, errmsg = client:commit_pipeline()
        end
        if results or not RETRY_ERRMSG_MAP[errmsg] then
            break
        end
    end
    if not results or not util:isTable(results) or #results ~= 2 then
        exception:raise("core.queryFailed", { args = { ... }, message = errmsg })
    end
    local selectRet, cmdRet = unpack(results)
    if not selectRet or (util:isTable(selectRet) and not selectRet[1]) then
        exception:raise("core.queryFailed", { cmd = "select", args = { self.dbIndex }, message = selectRet[2] })
    end
    if not cmdRet then
        exception:raise("core.queryFailed", { cmd = cmd, args = { ... }, message = cmdRet[2] })
    end
    return cmdRet
end
--- 实例工厂
local Module = { instances = {} }
--- 获取查询对象实例
--
-- @param number dbIndex 数据库索引
-- @return table 查询对象
function Module:getInstance(dbIndex)
    if not self.instances[dbIndex] then
        self.instances[dbIndex] = util:inherit({ dbIndex = dbIndex }, Redis)
    end
    return self.instances[dbIndex]
end
--- 关闭连接
function Module:close()
    closeClient()
end
return Module
 |