更新時間:2020-11-04 來源:黑馬程序員 瀏覽量:
1. Redis分布式鎖實現原理
分布式鎖本質上要實現的目標就是在Redis里面占一個“茅坑”,當別的進程也要來占時,發(fā)現已經有人蹲在那里了,就只好放棄或者稍后再試。占坑一般是使用setnx(set if not exists)指令,只允許被一個客戶端占坑。先來先占,用完了,再調用del指令釋放茅坑。
死鎖問題:如果邏輯執(zhí)行到中間出現異常了,可能會導致del指令沒有被調用,這樣就會陷入死鎖,鎖永遠得不到釋放,解決這個問題我們在拿到鎖之后,再給鎖加上一個過期時間,比如 5s,這樣即使中間出現異常也可以保證 5 秒之后鎖會自動釋放。
2. 普通非阻塞鎖實現
public class RedisLock { private Jedis jedis; public RedisLock(Jedis jedis) { this.jedis = jedis; } public boolean lock(String key) { return jedis.set(key, "", "nx", "ex", 5L) != null; } public void unlock(String key) { jedis.del(key); } }
2.1 存在問題
如果某一個進程沒有拿到鎖得到了false的結果那么次進程是否執(zhí)行當前任務?顯然對于一般情況來說我們的任務都是必須執(zhí)行的那么此時我們就要考慮該何時執(zhí)行了,在傳統(tǒng)的鎖中我們如果沒有拿到鎖線程就進入了阻塞狀態(tài)那么此處我們是否可以改進同樣實現阻塞喚醒機制。
3. 分布式阻塞鎖具體實現
3.1 解決思路
(1)首先我們改造lock鎖,當不能創(chuàng)建key時就利用當前key阻塞當前線程
(2)當某一個線程釋放鎖時通過redis的pub/sub發(fā)送一個消息消息內容為key
(3)所有使用鎖的應用監(jiān)聽lock通道的消息,在收到消息時通過key喚醒對應線程
3.2具體實現
package com.hgy.common.redis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import java.util.HashMap; public class RedisLock extends JedisPubSub { //是否已經初始化監(jiān)聽 private static volatile boolean isListen = false; //每一個redis的key對應一個阻塞對象 private HashMap<String, Object> blockers = new HashMap<>(); private Jedis jedis; //當前獲得鎖的線程 private Thread curThread; public RedisLock(Jedis jedis) { this.jedis = jedis; //保證沒一個應用只初始化一次監(jiān)聽 if (!isListen) { synchronized (RedisLock.class) { if (!isListen) { // 啟動一個線程做消息監(jiān)聽 new Thread(()->{ new Jedis("192.168.200.128", 6379).subscribe(this,"lock"); }).start(); isListen = true; } } } } public void lock(String key) throws InterruptedException { //循環(huán)判斷是否能夠創(chuàng)建key, 不能則直接wait釋放CPU執(zhí)行權 while (jedis.set(key, "", "nx", "ex", 20L) == null) { synchronized (key) { System.out.println(Thread.currentThread().getName() + "======="+ key); blockers.put(key, key); key.wait(); } } blockers.put(key, key); //能夠成功創(chuàng)建,獲取鎖成功記錄當前獲取鎖線程 curThread = Thread.currentThread(); } public void unlock(String key) { //判斷是否為加鎖的線程執(zhí)行解鎖, 不是則直接忽略 if( curThread == Thread.currentThread()) { jedis.del(key); //刪除key之后需要notifyAll所有的應用, 所以這里采用發(fā)訂閱消息給所有的應用 jedis.publish("lock", key); } } /** * 所有應用接收到消息后在當前應用中執(zhí)行對應key的notifyAll方法 * @param channel * @param message */ @Override public void onMessage(String channel, String message) { Object lock = blockers.get(message); if(lock != null) { synchronized (lock) { lock.notifyAll(); } } } }
4.測試
目標: 開啟兩個mian線程, 在第一個中首先暫停3秒然后打印1-100然后線程休眠5秒釋放鎖并打印最后的毫秒數;main1在執(zhí)行的同時執(zhí)行main2,在2中打印開始時間;最后比對1和2的開始時間即可驗
證。
注意: 先啟動1然后啟動2
·main1
package com.hgy; import com.hgy.common.redis.RedisLock; import redis.clients.jedis.Jedis; public class RedisLockApp1 { private static RedisLock redisLock; public static void main(String[] args) throws InterruptedException { Jedis client = new Jedis("192.168.200.128", 6379); redisLock = new RedisLock(client); redisLock.lock("demo"); Thread.sleep(3000); for (int i = 0; i < 100; i++) { System.out.println("app1" + i); } Thread.sleep(5000); redisLock.unlock("demo"); System.out.println("App1==> end:" + System.currentTimeMillis()); } }
·main2
package com.hgy; import com.hgy.common.redis.RedisLock; import redis.clients.jedis.Jedis; public class RedisLockApp2 { private static RedisLock redisLock; public static void main(String[] args) throws InterruptedException { Jedis client = new Jedis("192.168.200.128", 6379); redisLock = new RedisLock(client); redisLock.lock("demo"); System.out.println("App2==> start:" + System.currentTimeMillis()); for (int i = 0; i < 100; i++) { System.out.println("app2" + i); } redisLock.unlock("demo"); } }
注意
如果細心的小伙伴兒可能已經發(fā)現了unlock其實不是一個原子操作,可能在未發(fā)布消息但刪除key之后的這段時間如果有人此時執(zhí)行l(wèi)ock那么可以直接拿到鎖;但是影響不大因為拿到鎖之后其他被阻塞的線程被喚醒之后將會繼續(xù)阻塞。
猜你喜歡