Redis面试八股文-Redis实现分布式锁和红锁

背景

什么实际场景下需要使用分布式锁,为什么?

分布式锁在以下实际场景中经常需要使用:

缓存同步:在分布式系统中,多个节点共享同一个缓存。当缓存失效时,多个节点可能会同时触发对后端数据源的请求。使用分布式锁可以保证只有一个节点去加载数据到缓存中,避免缓存击穿和缓存雪崩问题,提高系统性能和稳定性。

并发任务控制:在分布式任务调度系统中,多个节点可能会争夺执行任务的权限。使用分布式锁可以确保只有一个节点获得任务执行权限,避免重复执行相同的任务或者任务冲突。

数据库操作:在分布式系统中,多个服务同时对同一个数据库进行写操作时,可能会引发数据不一致的问题。使用分布式锁可以保证只有一个服务可以执行数据库事务,确保数据的一致性和完整性。

分布式资源控制:在分布式系统中,可能存在共享资源(如文件、网络连接、设备等)需要被多个节点同时访问。使用分布式锁可以控制对这些资源的互斥访问,避免冲突和资源竞争。

全局唯一性约束:在分布式系统中,需要生成全局唯一的标识符(如全局唯一订单号、全局唯一ID等)。使用分布式锁可以确保在高并发情况下生成全局唯一的标识符,避免重复和冲突。

分布式事务:在分布式系统中,涉及多个服务对多个数据源进行修改操作,并需要保证这些修改的原子性和一致性。使用分布式锁可以控制分布式事务的并发访问,保证事务的正确执行。

所以,分布式锁在分布式系统中用于解决并发访问共享资源的问题,确保数据的一致性、避免冲突和竞争,保证系统的可靠性和正确性。它是实现分布式系统中关键的机制之一。

常见实现方案

以下是几种常见的分布式锁实现方案:

基于数据库的实现:使用数据库的事务和唯一性约束来实现分布式锁。通过在数据库中创建一张表或者使用特定的锁表记录锁的状态,使用数据库的事务和行级锁来确保只有一个节点可以获取到锁。

基于Redis的实现:Redis是一种高性能的键值存储系统,常被用于实现分布式锁。通过在Redis中使用SETNX命令(或者其它带有过期时间的命令)来尝试获取锁,成功获取到锁的节点可以执行相应的操作,其他节点则等待。

基于ZooKeeper的实现:ZooKeeper是一个开源的分布式协调服务,它提供了有序节点和锁机制。通过创建一个有序的临时节点,每个节点尝试创建自己的临时节点,然后通过判断自己是否为最小节点来决定是否获取到锁。

本文主要讨论使用Redis的SETNX进行实现。

分布式锁

原理

Redis里有一个设置如果不存在的命令,我们可以通过这个命令结合Redis的单线程特性来实现互斥锁功能,在Redis官方文档里面推荐的标准实现方式是SET key value NX PX 8000这串命令,其中:

key表示要锁定的资源名
NX表示如果不存在则设置
PX 8000表示过期时间为8000毫秒,也就是8秒
value这个值在所有的客户端必须是唯一的,所有同一key的锁竞争者这个值都不能一样。

值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:
只有key存在并且存储的值和我指定的值一样才能告诉我删除成功,避免错误释放别的竞争者的锁。
由于涉及到两个操作,因此我们需要通过Lua脚本保证操作的原子性:

1
2
3
4
5
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    else
        return 0
    end

举个不用Lua脚本的例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。
因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。

续期策略或无法获取锁时进行容错?

我们前面的例子中提到的互斥锁有一个小问题,就是如果持有锁客户端A被阻塞,那么A的锁可能会超时被自动释放,导致客户端B提前获取到锁。
为了减少这种情况的发生,我们可以在A持有锁期间,不断地延长锁的过期时间,减少客户端B提前获取到锁的情况。
当然,这里我们不可能在某个客户端无限续期,否则其他的客户端将永远无法获取到锁然后执行后续操作。
所以,这里我们要根据具体的业务场景设定续期次数,或在客户端一定时间获取不到锁以后进行容错操作。

锁续期脚本:

1
2
3
4
5
6
7
var touchScript = `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    else
        return 0
    end
`

具体实现

constant.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package rmutex

import (
    "crypto/rand"
    "encoding/base64"
    "fmt"
)

const (
    minRetryDelayMilliSec = 50
    maxRetryDelayMilliSec = 250
)

func genValue() string {
    b := make([]byte, 16)
    _, err := rand.Read(b)
    if err != nil {
        return ""
    }
    result := base64.StdEncoding.EncodeToString(b)
    fmt.Println(result)
    return result
}

var deleteScript = `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    else
        return 0
    end
`


var touchScript = `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    else
        return 0
    end
`

errors.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package rmutex

import (
    "errors"
    "fmt"
)

// ErrFailed 获取锁失败,且用掉所有重试次数
var ErrFailed = errors.New("rmutex: failed to acquire lock")

// ErrExtendFailed 无法延长锁的持有时间
var ErrExtendFailed = errors.New("rmutex: failed to extend lock")

// ErrTaken happens when the lock is already taken in a quorum on nodes.
type ErrTaken struct {
    Nodes []int
}

func (err ErrTaken) Error() string {
    return fmt.Sprintf("lock already taken, locked nodes: %v", err.Nodes)
}

// ErrNodeTaken is the error resulting if the lock is already taken in one of
// the cluster's nodes
type ErrNodeTaken struct {
    Node int
}

func (err ErrNodeTaken) Error() string {
    return fmt.Sprintf("node #%d: lock already taken", err.Node)
}

// A RedisError is an error communicating with one of the Redis nodes.
type RedisError struct {
    Node int
    Err  error
}

func (err RedisError) Error() string {
    return fmt.Sprintf("node #%d: %s", err.Node, err.Err)
}

rmutex.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package rmutex

import (
    "context"
    "errors"
    "math/rand"
    "time"

    "github.com/redis/go-redis/v9"
)

type DelayFunc func(tries int) time.Duration

type Mutex struct {
    name   string
    expiry time.Duration

    tries     int
    delayFunc DelayFunc

    genValueFunc func() string
    value        string
    until        time.Time

    quorum int

    driftFactor   float64
    timeoutFactor float64

    clients []*redis.Client // Redis客户端
}

func NewMutex(name string, clients []*redis.Client, expiry time.Duration, tries int, genValueFunc func() string) *Mutex {
    m := &Mutex{
        name:   name,
        expiry: expiry,
        tries:  tries,
        delayFunc: func(tries int) time.Duration {
            source := (maxRetryDelayMilliSec - minRetryDelayMilliSec) + minRetryDelayMilliSec
            return time.Duration(rand.Intn(source)) * time.Millisecond
        },
        genValueFunc:  genValueFunc,
        value:         "",
        until:         time.Time{},
        quorum:        len(clients)/2 + 1,
        driftFactor:   0.01,
        timeoutFactor: 0.05,
        clients:       clients,
    }

    if genValueFunc != nil {
        m.genValueFunc = genValueFunc
    } else {
        m.genValueFunc = genValue
    }

    m.value = m.genValueFunc()

    return m
}

func (m *Mutex) Name() string {
    return m.name
}

func (m *Mutex) Value() string {
    return m.value
}

func (m *Mutex) Until() time.Time {
    return m.until
}

func (m *Mutex) Lock() error {
    return m.LockContext(context.Background())
}

func (m *Mutex) LockContext(ctx context.Context) error {
    if ctx == nil {
        ctx = context.Background()
    }

    for i := 0; i < m.tries; i++ {
        if i != 0 {
            select {
            case <-ctx.Done():
                // Exit early if the context is done.
                return ErrFailed
            case <-time.After(m.delayFunc(i)):
                // Fall-through when the delay timer completes.
            }
        }

        start := time.Now()

        n, err := func() (int, error) {
            ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
            defer cancel()
            return m.actOnPoolsAsync(func(client *redis.Client) (bool, error) {
                return m.acquire(ctx, client)
            })
        }()

        now := time.Now()
        until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
        if n >= m.quorum && now.Before(until) {
            m.until = until
            return nil
        }

        func() (int, error) {
            ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
            defer cancel()
            return m.actOnPoolsAsync(func(client *redis.Client) (bool, error) {
                return m.release(ctx, client)
            })
        }()
        if i == m.tries-1 && err != nil {
            return err
        }
    }

    return ErrFailed
}

func (m *Mutex) acquire(ctx context.Context, client *redis.Client) (bool, error) {
    reply, err := client.SetNX(ctx, m.name, m.value, m.expiry).Result()
    if err != nil {
        return false, err
    }
    return reply, nil
}

// Unlock unlocks m and returns the status of unlock.
func (m *Mutex) Unlock() (bool, error) {
    return m.UnlockContext(context.Background())
}

// UnlockContext unlocks m and returns the status of unlock.
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
    n, err := m.actOnPoolsAsync(func(client *redis.Client) (bool, error) {
        return m.release(ctx, client)
    })
    if n < m.quorum {
        return false, err
    }
    return true, nil
}

func (m *Mutex) release(ctx context.Context, client *redis.Client) (bool, error) {
    status, err := client.Eval(ctx, deleteScript, []string{m.name}, m.value).Result()
    if err != nil {
        return false, err
    }
    return status != int64(0), nil
}

func (m *Mutex) Extend() (bool, error) {
    return m.ExtendContext(context.Background())
}

func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
    start := time.Now()
    n, err := m.actOnPoolsAsync(func(client *redis.Client) (bool, error) {
        return m.touch(ctx, client, int(m.expiry/time.Millisecond))
    })
    if n < m.quorum {
        return false, err
    }
    now := time.Now()
    until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
    if now.Before(until) {
        m.until = until
        return true, nil
    }
    return false, ErrExtendFailed
}

// Valid returns true if the lock acquired through m is still valid. It may
// also return true erroneously if quorum is achieved during the call and at
// least one node then takes long enough to respond for the lock to expire.
//
// Deprecated: Use Until instead. See https://github.com/go-redsync/redsync/issues/72.
func (m *Mutex) Valid() (bool, error) {
    return m.ValidContext(context.Background())
}

// ValidContext returns true if the lock acquired through m is still valid. It may
// also return true erroneously if quorum is achieved during the call and at
// least one node then takes long enough to respond for the lock to expire.
//
// Deprecated: Use Until instead. See https://github.com/go-redsync/redsync/issues/72.
func (m *Mutex) ValidContext(ctx context.Context) (bool, error) {
    n, err := m.actOnPoolsAsync(func(client *redis.Client) (bool, error) {
        return m.valid(ctx, client)
    })
    return n >= m.quorum, err
}

func (m *Mutex) valid(ctx context.Context, client *redis.Client) (bool, error) {
    reply, err := client.Get(ctx, m.name).Result()
    if err != nil {
        return false, err
    }
    return m.value == reply, nil
}

func (m *Mutex) touch(ctx context.Context, client *redis.Client, expiry int) (bool, error) {
    status, err := client.Eval(ctx, touchScript, []string{m.name}, m.value, expiry).Result()
    if err != nil {
        return false, err
    }
    return status != int64(0), nil
}

func (m *Mutex) actOnPoolsAsync(actFn func(*redis.Client) (bool, error)) (int, error) {
    type result struct {
        Node   int
        Status bool
        Err    error
    }

    ch := make(chan result)
    for node, client := range m.clients {
        go func(node int, client *redis.Client) {
            r := result{Node: node}
            r.Status, r.Err = actFn(client)
            ch <- r
        }(node, client)
    }
    n := 0
    var taken []int
    var err error
    for range m.clients {
        r := <-ch
        if r.Status {
            n++
        } else if r.Err != nil {
            err = errors.Join(err, &RedisError{Node: r.Node, Err: r.Err})
        } else {
            taken = append(taken, r.Node)
            err = errors.Join(err, &ErrNodeTaken{Node: r.Node})
        }
    }

    if len(taken) >= m.quorum {
        return n, &ErrTaken{Nodes: taken}
    }
    return n, err
}

红锁(Redlock)

上面的分布式锁主要是基于单个Redis实例来实现,为了提高系统的可用性,通常可能会使用多个Redis实例同时执行相同的锁操作,如果客户端在大多数(N/2+1)的Redis节点上成功获取到了锁,那么认为是加锁成功。这也就是红锁的基本思想。
红锁的基本原理如下:

获取锁:当一个客户端需要获取锁时,它会尝试在多个Redis节点上进行加锁操作。每个节点使用相同的锁名称、唯一的锁值(由客户端生成)和锁的过期时间。

锁的持有:如果客户端在大多数(N/2+1)的Redis节点上成功获取到了锁,并且在每个节点上的当前时间都小于锁的过期时间,那么认为锁已被成功获取。

锁的释放:客户端在释放锁时,会向所有持有锁的Redis节点发送解锁请求,要求删除对应的锁。

上面的代码已经实现了红锁的功能,代码参考并简化自: https://github.com/go-redsync/redsync

总结

1. 利用Redis和lua脚本进行原子操作,可实现在分布式系统中进行分布式锁操作
2. 需要根据失业的业务场景选择延长锁定时间或进行获取锁失败的容错操作
3. 红锁是在分布式锁的基础上,使用多个实例进行锁操作,如果(N/2+1)个客户端加锁成功,则认为获取到了锁

程序猿老龚(龚杰洪)原创,版权所有,转载请注明出处.

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注