首页  

基于Redis 的主节点选举     所属分类 architecture 浏览量 86
MasterElectionService




import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.RandomUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


// 主节点选举
@Service
public class MasterElectionService {

    private static final Logger LOG = LoggerFactory.getLogger(MasterElectionService.class);

    @Resource
    RedisTemplate redisTemplate;


    //  秒 5S
    static final long HEARTBEAT_TIME_S = 5L;
    // 毫秒 5000MS 
    static final long HEARTBEAT_TIME_MS = HEARTBEAT_TIME_S * 1000L;

    // 锁过期时间 10个心跳时间 
    static final long LOCK_TIME_S = 10 * HEARTBEAT_TIME_S;

    static final long LOCK_TIME_MS = LOCK_TIME_S * 1000L;
    // 过期时间大于 该时间 认为非法 直接删除
    static final long LOCK_TIME_S_MAX = LOCK_TIME_S + HEARTBEAT_TIME_S;

    static final String NODE_ID = getNodeId();

    public static final String LOCK_KEY = "codefun007_lock_001";

    volatile long lockTtlMs = 0;
    volatile long updateTime = 0;

    private final Map<String,String> status = new ConcurrentHashMap<String, String>() ;

    private final AtomicLong runCount = new AtomicLong(0);
    private final AtomicLong errorCount = new AtomicLong(0);


    public boolean isMaster(){
        if(lockTtlMs<=0 || updateTime<=0){
            return false;
        }
        long now = System.currentTimeMillis();
        long dif = now - updateTime;
        if(dif<0){
            // 不可能, 除非 时钟回拨了
            return false;
        }
        if(dif < lockTtlMs){
            return true;
        }

        return false;
    }


    // 5000 ms
    @Scheduled(fixedDelay = HEARTBEAT_TIME_MS)
    public void heartbeat(){
        // LOG.info("MasterElectionService_heartbeat_run,"+ LocalDateTime.now());
        try{
            runCount.getAndIncrement();
            updateLockStatus();
        }catch(Exception e){
            errorCount.getAndIncrement();
        }
    }

    public Map<String,Object> getInfo(){
        Map<String,Object> info = new LinkedHashMap<>();
        info.put("isMaster",isMaster());
        info.put("nodeId",NODE_ID);
        info.put("lockTtlMs",lockTtlMs);
        info.put("updateTime",updateTime);

        info.put("runCount",runCount.longValue());
        info.put("errorCount",errorCount.longValue());

        Object value = redisTemplate.opsForValue().get(LOCK_KEY);
        info.put("lockValue",value);

        Long expireTime = redisTemplate.getExpire(LOCK_KEY);
        info.put("expireTime",expireTime);

        info.put("status",status);

        return info;
    }



    private void updateLockStatus(){

        Object value = redisTemplate.opsForValue().get(LOCK_KEY);

        status.put("lockKeyValue",value+"@"+LocalDateTime.now());

        if(value == null){
            lockTtlMs = 0;
            updateTime = 0;
            // 锁没有被持有 , 争抢锁
            final String valueNew = NODE_ID +"_"+LocalDateTime.now();
            final long expireSeconds = LOCK_TIME_S;
            final long now = System.currentTimeMillis();
            boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, valueNew, expireSeconds, TimeUnit.SECONDS);

            status.put("try_lock_info",result+"@"+LocalDateTime.now());

            if(result){
                // 抢锁成功 ,更新状态
                lockTtlMs = LOCK_TIME_MS;
                updateTime = now;

                LOG.info("get_lock_success,key="+LOCK_KEY+",value="+valueNew);
            }

        }else{

            final long now = System.currentTimeMillis();
            Long expireTime = redisTemplate.getExpire(LOCK_KEY);
            // -1 key存在且未设置过期时间
            // -2 key不存在或已过期

            status.put("lockKeyExpireTime",expireTime+"@"+LocalDateTime.now());


            if(expireTime==null || expireTime.longValue()==-1 || expireTime.longValue() > LOCK_TIME_S_MAX){
                // 非法 删除
                lockTtlMs = 0;
                updateTime = 0;
                redisTemplate.delete(LOCK_KEY);

                status.put("lockKeyExpireTimeInvalidDelete",expireTime+"@"+LocalDateTime.now());
            }

            if(value.toString().startsWith(NODE_ID)){
                // 自己持有锁
                if(expireTime==null || expireTime.longValue()<=0){
                    lockTtlMs = 0;
                    updateTime = 0;
                }else{

                    lockTtlMs = expireTime.longValue() * 1000L;
                    updateTime = now;

                    if( expireTime.longValue() > (2*HEARTBEAT_TIME_S)){
                        // 存活时间 大于 2个心跳时间 ,继续续约
                        final long expireSeconds = LOCK_TIME_S;
                        Boolean result = redisTemplate.opsForValue().getOperations().expire(LOCK_KEY,expireSeconds, TimeUnit.SECONDS);
                        status.put("lockKeySetExpireTime",result+"@"+LocalDateTime.now());
                    }
                }

            }else{
                // 别的节点持有锁
                lockTtlMs = 0;
                updateTime = 0;
            }
        }
    }

    private static String getNodeId(){
        String nodeId = NetUtil.getLocalhostStr()+"_"+LocalDateTime.now()+"_"+ RandomUtil.randomString(9);
        return nodeId;
    }

}



上一篇     下一篇
毛主席名句

毛主席语录最经典50句

毛主席语录

9个高效工作法

交易员的九个段位

巴菲特 双目标清单系统(Two-List System)