基于Redis 的主节点选举
所属分类 architecture
浏览量 86
MasterElectionService
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.RandomUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
// 主节点选举
@Service
public class MasterElectionService {
private static final Logger LOG = LoggerFactory.getLogger(MasterElectionService.class);
@Resource
RedisTemplate redisTemplate;
// 秒 5S
static final long HEARTBEAT_TIME_S = 5L;
// 毫秒 5000MS
static final long HEARTBEAT_TIME_MS = HEARTBEAT_TIME_S * 1000L;
// 锁过期时间 10个心跳时间
static final long LOCK_TIME_S = 10 * HEARTBEAT_TIME_S;
static final long LOCK_TIME_MS = LOCK_TIME_S * 1000L;
// 过期时间大于 该时间 认为非法 直接删除
static final long LOCK_TIME_S_MAX = LOCK_TIME_S + HEARTBEAT_TIME_S;
static final String NODE_ID = getNodeId();
public static final String LOCK_KEY = "codefun007_lock_001";
volatile long lockTtlMs = 0;
volatile long updateTime = 0;
private final Map status = new ConcurrentHashMap() ;
private final AtomicLong runCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
public boolean isMaster(){
if(lockTtlMs<=0 || updateTime<=0){
return false;
}
long now = System.currentTimeMillis();
long dif = now - updateTime;
if(dif<0){
// 不可能, 除非 时钟回拨了
return false;
}
if(dif < lockTtlMs){
return true;
}
return false;
}
// 5000 ms
@Scheduled(fixedDelay = HEARTBEAT_TIME_MS)
public void heartbeat(){
// LOG.info("MasterElectionService_heartbeat_run,"+ LocalDateTime.now());
try{
runCount.getAndIncrement();
updateLockStatus();
}catch(Exception e){
errorCount.getAndIncrement();
}
}
public Map getInfo(){
Map info = new LinkedHashMap<>();
info.put("isMaster",isMaster());
info.put("nodeId",NODE_ID);
info.put("lockTtlMs",lockTtlMs);
info.put("updateTime",updateTime);
info.put("runCount",runCount.longValue());
info.put("errorCount",errorCount.longValue());
Object value = redisTemplate.opsForValue().get(LOCK_KEY);
info.put("lockValue",value);
Long expireTime = redisTemplate.getExpire(LOCK_KEY);
info.put("expireTime",expireTime);
info.put("status",status);
return info;
}
private void updateLockStatus(){
Object value = redisTemplate.opsForValue().get(LOCK_KEY);
status.put("lockKeyValue",value+"@"+LocalDateTime.now());
if(value == null){
lockTtlMs = 0;
updateTime = 0;
// 锁没有被持有 , 争抢锁
final String valueNew = NODE_ID +"_"+LocalDateTime.now();
final long expireSeconds = LOCK_TIME_S;
final long now = System.currentTimeMillis();
boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, valueNew, expireSeconds, TimeUnit.SECONDS);
status.put("try_lock_info",result+"@"+LocalDateTime.now());
if(result){
// 抢锁成功 ,更新状态
lockTtlMs = LOCK_TIME_MS;
updateTime = now;
LOG.info("get_lock_success,key="+LOCK_KEY+",value="+valueNew);
}
}else{
final long now = System.currentTimeMillis();
Long expireTime = redisTemplate.getExpire(LOCK_KEY);
// -1 key存在且未设置过期时间
// -2 key不存在或已过期
status.put("lockKeyExpireTime",expireTime+"@"+LocalDateTime.now());
if(expireTime==null || expireTime.longValue()==-1 || expireTime.longValue() > LOCK_TIME_S_MAX){
// 非法 删除
lockTtlMs = 0;
updateTime = 0;
redisTemplate.delete(LOCK_KEY);
status.put("lockKeyExpireTimeInvalidDelete",expireTime+"@"+LocalDateTime.now());
}
if(value.toString().startsWith(NODE_ID)){
// 自己持有锁
if(expireTime==null || expireTime.longValue()<=0){
lockTtlMs = 0;
updateTime = 0;
}else{
lockTtlMs = expireTime.longValue() * 1000L;
updateTime = now;
if( expireTime.longValue() > (2*HEARTBEAT_TIME_S)){
// 存活时间 大于 2个心跳时间 ,继续续约
final long expireSeconds = LOCK_TIME_S;
Boolean result = redisTemplate.opsForValue().getOperations().expire(LOCK_KEY,expireSeconds, TimeUnit.SECONDS);
status.put("lockKeySetExpireTime",result+"@"+LocalDateTime.now());
}
}
}else{
// 别的节点持有锁
lockTtlMs = 0;
updateTime = 0;
}
}
}
private static String getNodeId(){
String nodeId = NetUtil.getLocalhostStr()+"_"+LocalDateTime.now()+"_"+ RandomUtil.randomString(9);
return nodeId;
}
}
上一篇
下一篇
毛主席名句
毛主席语录最经典50句
毛主席语录
9个高效工作法
交易员的九个段位
巴菲特 双目标清单系统(Two-List System)