Golang的Redis客户端
在项目开发的时候有木有纠结过,该选哪个库做为链接Redis用。如果项目用不到Redis,可以忽略。
写作目的
主要是针对golang的两个库:go-redis 和 redigo 。
虽然这两个库我在项目都使用过,结合项目场景来说下各自的区别。
个人比较倾向使用go-redis 。
解读两个库
go-redis | redigo | |
|---|---|---|
| 代码库 | github.com/go-redis/redis/v7 | github.com/gomodule/redigo |
| 连接 | 内建智能连接池(NewClient初始化自动启用)。自动回收空闲连接,默认参数适用多数场景。 健康检查:支持周期性Ping保活。 | 需显式配置redis.Pool并手动管理连接获取/归还。需手动设置 MaxIdle/MaxActive等参数精细控制健康检查:需配置 TestOnBorrow函数手动实现 |
| 风格 | 类型安全与封装友好: 结构化命令封装:go-redis为每个Redis命令设计了独立方法(如 rdb.Set())。类型统一化设计:将Redis返回数据统一为 Result接口类型,通过类型断言提取值。上下文集成:所有操作支持context.Context参数。 | 原生接口与轻量透明: 极简的接口(Do/Send):通过统一的Do(cmd string, args …interface{})方法执行所有命令。 显式类型转换:返回值处理需结合redis.String、redis.Int等函数手工转换。 轻量无依赖:代码精炼,无外部依赖。 |
| 命令执行 | 链式调用:支持Set().Err()或Set().Result()形式。 错误类型识别:预定义如redis.Nil(表示Key不存在)等错误类型 | 错误与结果混合:result, err := conn.Do(“GET”, “key”)需每次检查错误。 类型安全缺失:Do方法返回interface{}需配合redis.String等转换,转换错误需额外处理。 |
| Pipeline | 提供TxPipeline()和Pipeline()封装,支持链式调用并自动管理命令批量提交。 | 需组合Send()、Flush()、Receive()方法手动控制,灵活性高但易出错。 |
| (Pub/Sub) | 提供专用PubSubConn类型简化订阅循环管理。 | 抽象层级更高,支持Subscribe()返回*PubSub对象配合Channel消费。 |
代码详细说明对比(连接、风格、命令执行)
- go-redis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type RedisOption struct { Addr string Db int Username string // 非必填 Password string // userName:password } func NewRedisClient(opt RedisOption) (*redis.Client, error) { //可以直接ip端口+db+认证,源码内实际就支持了连接池 clt := redis.NewClient(&redis.Options{ Addr: opt.Addr, // Redis服务器地址 DB: opt.Db, // 默认DB Password: opt.Password, // 密码(无密码留空) DialTimeout: 5 * time.Second, //源码也是5s超时 ReadTimeout: 5 * time.Second, //源码内是3s超时 WriteTimeout: 5 * time.Second, //源码内默认跟ReadTimeout一致 PoolSize: 100, //源码是5倍的cpu数 }) if err := clt.Ping().Err(); err != nil { //todo error log or waring return nil, err } return ctl, nil } //rdsClient:= NewRedisClient() //rdsClient.Incr("abc").Result()源码解读eg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 //文件commands.go //Incr 对key进行自增,返回一个 IntCmd对象,如果我们读取key的值,会用到StringCmd //优势的OOD设计思想 func (c cmdable) Incr(key string) *IntCmd { cmd := NewIntCmd("incr", key) _ = c(cmd) return cmd } ... //文件command.go //Result IntCmd 的Result() 返回两个值 func (cmd *IntCmd) Result() (int64, error) { return cmd.val, cmd.err }以上可以看的出来
go-redis在使用上的连接方式、代码风格、命令执行过程中的优势。
- redigo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 type redisConfig struct { ComPrefix string MaxActive int MaxIdle int IdleTimeout int Address string Passwd string Name string //数据库实例唯一名称 DBIndex int //同个Address下不同的DBIndex Prefix string //若此值为""则采用RedisCommon的CommonPrefix } type RedisInstance struct { Name string DSN string prefix string Pool *redis.Pool } var redisConn = make(map[string]*RedisInstance) func initRedisPool(dbCfg *redisConfig) (*RedisInstance, error) { if dbCfg == nil { return nil, errors.New("redis config is nil") } prefix := dbCfg.ComPrefix if 0 < len(dbCfg.Prefix) { prefix = dbCfg.Prefix } // 建立连接池 rdsInstance := &RedisInstance{ Name: dbCfg.Name, prefix: prefix, DSN: fmt.Sprintf("%s DBIndex:%d", dbCfg.Address, dbCfg.DBIndex), Pool: &redis.Pool{ MaxIdle: dbCfg.MaxIdle, MaxActive: dbCfg.MaxActive, IdleTimeout: time.Duration(dbCfg.IdleTimeout) * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", dbCfg.Address, redis.DialReadTimeout(time.Duration(500)*time.Millisecond), redis.DialWriteTimeout(time.Duration(500)*time.Millisecond), redis.DialConnectTimeout(time.Duration(1000)*time.Millisecond), ) if err != nil { return nil, err } if dbCfg.Passwd != "" { if _, err := c.Do("AUTH", dbCfg.Passwd); err != nil { c.Close() return nil, err } } if dbCfg.DBIndex != 0 { if _, err := c.Do("SELECT", dbCfg.DBIndex); err != nil { c.Close() return nil, err } } return c, nil }, }, } redisConn[dbCfg.Name] = rdsInstance return rdsInstance, nil } //封装一个从连接池内部获取一个可用链接 const getRedisMaxRetry=3 func (r *RedisInstance) getConn(retryTimes int) (redis.Conn, error) { if getRedisMaxRetry <= retryTimes { err := errors.New("empty conns") if err != nil { logx.Errorf("redis pool init fail, err= %v", err) } return nil, err } conn := r.Pool.Get() err := conn.Err() if err != nil { if retryTimes == getRedisMaxRetry-1 { // 最后一次尝试sleep 100ms time.Sleep(100 * time.Millisecond) } else { // sleep 5ms time.Sleep(5 * time.Millisecond) } retryTimes++ logx.Errorf("redis pool get conn fail, err= %v", err) conn.Close() return r.getConn(retryTimes) } else { return conn, nil } } //项目中封装一个统一使用redisgo的Do的方法 func (r *RedisInstance) Do(ctx context.Context, commandName string, args ...interface{}) (reply interface{}, err error) { conn, err := r.getConn(0) if conn == nil || err != nil { return nil, err } defer conn.Close() //还需要手动去释放该链接 now := time.Now() reply, e := conn.Do(commandName, args...) costMs := time.Since(now).Milliseconds() if e != nil { return reply, errors.New(e.Error() + ", cost time: " + strconv.Itoa(int(costMs)) + " ms") } return reply, e } //封装了一个类似go-redis内部的Incr方法,并且返回类似*IntCmd.Result的结果。 func (r *RedisInstance) Incr(ctx context.Context, key string) (val int64, err error) { return redis.Int64(r.Do(ctx, "INCR", key)) } //例如我还获取key内容的,仍然需要统一封装才可以简化代码量 func (r *RedisInstance) GetString(ctx context.Context, key string) (string, error) { return redis.String(r.Do(ctx, "GET", key)) } //redisConn["uc"].Incr(context.Background(), key string)通过以上代码,我们可以看出在使用redigo的时候,我们可以灵活的对底层封装(同时也带来了不便性,需要我们项目内部进行二次封装)。
我个人倾向于go-redis的设计思想。
Pipeline(管道)的使用
为什么要用到Pipeline?
答:
Pipeline是Redis用来优化网络请求的一种机制,主要解决频繁命令往返导致的延迟问题。
比如每个命令单独发送时,客户端发送请求后要等待服务器响应才能继续下一个,这样多次往返的延迟累加起来会影响性能,特别是命令数量大的时候。
Pipeline的工作原理是客户端可以将多个命令打包一次性发送,服务器按顺序执行后一次性返回所有结果。这样减少了网络往返时间(RTT),提升吞吐量。
虽然执行顺序有保证,但中间可能被其他客户端的命令插入,所以不能保证原子性。如果需要原子性,应该用事务(MULTI/EXEC)。Pipeline适合需要批量操作但不要求原子性的情况,比如批量设置或获取多个键值。
核心思想是减少网络往返次数(Round Trip Time - RTT)。
非原子性、顺序性保证、资源占用(命令数量巨大或结果数据量巨大,可能会消耗较多的服务器内存)、错误处理-不会影响队列中后续命令的执行。
go-redis
1 2 3 4 5 6 7 8
//rdsClient:= NewRedisClient() pipeline := rdsClient.Pipeline() pipeline.HGet(key, "oneKey") pipeline.HGet(key, "anotherKey") cmderList, err := pipeline.Exec() oneVal:=cmderList[0].(*redis.StringCmd).Val() anotherVal:=cmderList[1].(*redis.StringCmd).Val()
redigo
1 2 3 4 5 6 7
//onePool:=RedisInstance.Pool.Get() // defer onePool.Close() onePool.Send(cmd1,arg1) onePool.Send(cmd1,arg1) conn.Flush() r1,_:=conn.Receive() r2,_:=conn.Receive()
在使用上go-redis 无需关注连接池获取和释放,只关注业务本身即可。但是在redigo更多的还是在扩展性上延伸的更广。
发布和订阅(Pub/Sub)
Redis 的 Pub/Sub(发布/订阅) 是一种消息通信模式,允许消息的发送者(发布者)将消息发送到特定的频道(channel),而订阅了该频道的接收者(订阅者)会实时接收到这些消息。这种模式实现了松耦合的消息传递,发布者和订阅者不需要知道彼此的存在,只需关注频道本身。
通俗的说:如果你想象你的程序含有类似FM收音机那种功能(实时接收不同频段的消息),就可以用redis的pub/sub。pub当做电台来用,sub当做收音机。可以实现进程间的实时通信。
特点:实时性、无持久化(消息丢失风险)、广播机制、动态订阅/退订、支持通配符订阅、
由于实时性和无持久化特点,产生了无背压机制(消费端也就无法告知消费端调整发送消息的速率),不会产生堆积。
若要类似kakfa那种消息可以回溯的,请使用Redis Stream。
go-redis
1 2 3 4 5 6 7 8
//rdsClient:= NewRedisClient() //rdsClient.Publish(channel , message ) // Publish posts the message to the channel. func (c cmdable) Publish(channel string, message interface{}) *IntCmd { cmd := NewIntCmd("publish", channel, message) _ = c(cmd) return cmd }
订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
func (m *pubSubDMO) Subscribe(channels []string, fn func(channel string, data []byte)) { //rdsClient:= NewRedisClient() subRedis := m.getGoRedis() if subRedis == nil { //未配置sub redis return } mySub := subRedis.Subscribe(channels...) // 指定一个缓冲区,大小1024 // 仅仅是表维度的更新,不会超过1024张 for msg := range mySub.ChannelSize(1024) { //pubSubDMO.Subscribe channel[%s],payload[%s]", msg.Channel, msg.Payload) fn(msg.Channel, []byte(msg.Payload)) } } ////使用redis消息订阅方式,取出消息,交给分发器,让分发器自行对接内部业务 // go PubSubDMO.Subscribe([]string{defines.RedisPubSubChannel}, func(channel string, message []byte) { // var data mpubsub.PubSubSubject // json.Unmarshal(message, &data) // mpubsub.Push(data) // })
通过PubSub类型和Channel()方法返回一个Go通道(chan *Message),用户可以直接在通道上循环读取消息。这种API更加符合Go语言的惯用法,简化了消息处理流程。
内置了重连机制,可以自动处理连接中断和重连。
redigo
1 redis.Int(r.Do(ctx, "PUBLISH", channel, message))订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 //onePool:=RedisInstance.Pool.Get() // defer onePool.Close() for { onePool := pool.Get() psc := redis.PubSubConn{Conn: onePool} psc.Subscribe("channel1") for { switch v := psc.Receive().(type) { case error: psc.Close() // 关闭当前连接 time.Sleep(5 * time.Second) break // 跳出内层循环,触发外层重连 // ... 其他类型处理 } } }需要用户自己实现重连逻辑。
当遇到网络中断或 Redis 重启时,需自动重连。或者该链接已经超过。
这是一种较低级别的API,需要用户手动处理连接和重连逻辑。
总结
两者对比:redigo更锻炼工程师在项目编写时关于错误处理、重连技巧上的开发;go-redis有简洁的API,可以让工程师花更多的时间留给对业务需求的设计。