今天看啥  ›  专栏  ›  Leviv

Redisson 应用于复杂业务注意点和优化点

Leviv  · 掘金  ·  · 2021-05-22 00:53
阅读 29

Redisson 应用于复杂业务注意点和优化点

为什么要使用Redis?

在游戏的跨服业务中:

  1. 如果需要多个服联动需要自定义多条跨服协议通讯(至少4条,跨服请求、返回,个人跨服请求返回),调试起来非常麻烦。
  2. 比较难保证每一个服的数据是同步的,经常发现某些服跟主服数据不一致的情况。
  3. 旧项目的跨服架构不支持玩家不在跨服发跨服个人请求,如果需要发生跨服请求,需要带大量的本服数据进行跨服请求。

是否能有一个应用在不消耗过多性能的情况独立在这些服外面的第三方应用做到统一调配几个服的数据,并且能屏蔽大多数跨服的细节专注本服的业务。所以我们使用了Redis。

先把结论放在前面:

  • 在 Redisson 框架使用过程中,问题都是出现在自己的使用方法上而并非Redis上。
  • Redis作为一个内存数据库还是很快。

1.在同一业务中,多次使用同步查询

错误发生的场景: 在同一业务中,误认为redis是内存数据库很快(事实上也是很快,但是会影响本机性能),不慎使用了多次请求。

/**
 * Redisson错误示范2
 * 在同一业务中,多次使用同步查询
 */
 @Test
public void test2() {
    for (int i = 0; i < 10000; i++) {
        client.getMap("test").getOrDefault(1, 0);
    }
}
复制代码
结果:同一结果多次同步查询,消耗性能
复制代码

2.在异步RFuture里面用同步方法,导致同时阻塞两个线程,最终导致所有线程阻塞

错误发生的场景: 同时查询多个参数的时候,需要起多个查询 如:同时需要A、B、C数据的时候,我们需要连续起查询

/**
 * Redisson错误示范1,在异步RFuture里面用同步方法
 * 导致同时阻塞两个线程
 */
 @Test
public void test() {
    for (int i = 0; i < 1000; i++) {
        RMap<Integer, Integer> map = client.getMap("test");
        RFuture<Integer> future = map.addAndGetAsync(1, 1);
        future.onComplete((value1, throwable) -> {
            System.out.println("value1 ==============" + value1);
            Object value2 = client.getMap("test").getOrDefault(1, 0);
            System.out.println("value2 ==============" + value2);
        });
    }
复制代码

结果: 简单来说,就是netty线程都被阻塞了。Redisson希望你增加网络线程缓和一下并发

new RedisTimeoutException("Command still hasn't been written into connection! Increase nettyThreads and/or retryInterval settings. Payload size in bytes: " + totalSize
        + ". Node source: " + source + ", connection: " + connectionFuture.getNow()
        + ", command: " + LogHelper.toString(command, params)
        + " after " + attempt + " retry attempts");
复制代码

根据上面两个问题的解决方案:

总结来说就是要解决两件事:减少与redis服务端的通讯、到本地服务器尽量异步执行

基于查询的优化方案:

方案1:如果这个请求是用于了锁,那只需要查询一次,如果有修改最后在加回去

/**
 * Redisson解决方案1
 * 问题:在同一业务中,多次使用同步查询
 * 方案:如果这个请求是用于了锁,那只需要查询一次,如果有修改最后在加回去
 */
 @Test
public void solve1() {
    RLock lock = client.getLock("testLock");
    boolean isLock = lock.tryLock();
    if (isLock) {
        try {
            RMap<Integer, Integer> map = client.getMap("test");
            Integer num = map.getOrDefault(1, 0);
            Result result = new Result(num);
            for (int i = 0; i < 10000; i++) {
                result.addValue(1);
            }
            map.put(1, result.getValue());
        } finally {
            lock.unlock();
        }
    }
}

复制代码

方案2:如果改数据不是频繁修改,建议使用缓存(RLocalCachedMap)

/**
 * Redisson解决方案2
 * 问题:在同一业务中,多次使用同步查询
 * 方案:如果改数据不是频繁修改,建议使用缓存(RLocalCachedMap)
 */
 @Test
public void solve2() {
    RLocalCachedMap<Integer, Integer> map = client.getLocalCachedMap("test", options);
    for (int i = 0; i < 1000; i++) {
        Integer value = map.getOrDefault(1, 0);
        System.out.println("value :" + value);
    }
}
复制代码

RLocalCachedMap 源码简析

  1. RLocalCachedMap 初始化的时候就会向redis服务器发送一个以自己命名的SUBSCRIBE 命令。
  2. RLocalCachedMap 在修改的时候,除了修改也会发一条以自己命名的PUBLISH 命令。接到这个命令的时候就会修改在本机的缓存 updateCache()。

RLocalCachedMap.init(初始化)

private void init(String name, LocalCachedMapOptions<K, V> options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
    // 创建监听器
    listener = new LocalCacheListener(name, commandExecutor, this, codec, options, cacheUpdateLogTime) {
        @Override
        protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
            CacheKey cacheKey = toCacheKey(keyBuf);
            Object key = codec.getMapKeyDecoder().decode(keyBuf, null);
            Object value = codec.getMapValueDecoder().decode(valueBuf, null);
            cachePut(cacheKey, key, value);
        }
    };
    // 发送监听到redis服务端
    listener.add(cache);
    }


@Override
public <M> int addListener(Class<M> type, MessageListener<? extends M> listener) {
    RFuture<Integer> future = addListenerAsync(type, (MessageListener<M>) listener);
    commandExecutor.syncSubscription(future);
    return future.getNow();
}


RLocalCachedMap.putOperationAsync(修改值)
 @Override
protected RFuture<V> putOperationAsync(K key, V value) {
    ByteBuf mapKey = encodeMapKey(key);
    CacheKey cacheKey = toCacheKey(mapKey);
    CacheValue prevValue = cachePut(cacheKey, key, value);
    broadcastLocalCacheStore(value, mapKey, cacheKey);

    if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
        V val = null;
        if (prevValue != null) {
            val = (V) prevValue.getValue();
        }
        return RedissonPromise.newSucceededFuture(val);
    }

    ByteBuf mapValue = encodeMapValue(value);
    byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
    ByteBuf msg = createSyncMessage(mapKey, mapValue, cacheKey);
    // 向所有订阅了这个map的主题的服发生已经被修改的信息
    return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
              "local v = redis.call('hget', KEYS[1], ARGV[1]); "
            + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
            + "if ARGV[4] == '1' then "
                + "redis.call('publish', KEYS[2], ARGV[3]); "
            + "end;"
            + "if ARGV[4] == '2' then "
                + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
                + "redis.call('publish', KEYS[2], ARGV[3]); "
            + "end;"
            + "return v; ",
            Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), 
            mapKey, mapValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}



Redisson没有提供队列的缓存,我们可以根据上面的逻辑实现一个自己的带缓存的队列
public class RLocalCachedQueue<V> {
    private ConcurrentLinkedQueue<V> cQueue = new ConcurrentLinkedQueue<V>();
    private RQueue<V> rQueue;
    private RTopic rTopic;

    public RLocalCachedQueue(Class clazz, String name, RedissonClient client) {
        super();
        this.rQueue = client.getQueue(name);
        this.rTopic = client.getTopic(name);
        this.rTopic.addListener(clazz, (channel, msg) -> {
            if (msg == null) {
                return;
            }
            V v = (V) msg;
            cQueue.add(v);
        });
        this.cQueue.addAll(rQueue.readAll());
    }

    public RFuture<Boolean> add(V v) {
        RFuture<Boolean> future = rQueue.offerAsync(v);
        rTopic.publish(v);
        return future;
    }

    public ConcurrentLinkedQueue<V> getQueue() {
        return cQueue;
    }
}
复制代码

==========================================================================================================================================================

基于修改的优化方案

以上说的就是 Redisson 查询上的优化,那添加(修改)。是不是也有优化的方案呢? 添加(修改)主要是从 减少与redis服务端的通讯 处理的。

方案1:使用Redis的管道
这个比较简单,就是把几条命令合成一条命令发给redis服务端
/**
 * Redisson优化写的方案 1
 * 方案:使用redis的管道功能
 */
@Test
public void write_solve1() {
    RBatch batch = client.createBatch();
    for (int i = 0; i < 100; i++) {
        batch.getMap("test").addAndGetAsync(1, 1);
    }
    batch.executeAsync();
}
复制代码
方案2:使用Redis的脚本

首先说下RBatch的局限,如果我想在管道里面做运算的操作是不能做到的。 如: 卖商品,当库存到0的时候,就不需要再减了。不然就会变负数了。

上面这种场景RBatch是做不到的,这个时候就需要用到Redis的lua脚本了。

/**
 * Redisson优化写的方案 2
 * 方案:使用Redis的脚本
 */
@Test
public void write_solve2() {
    List<Object> keys = new ArrayList<>();
    keys.add("\"stock\"");
    keys.add("\"amount\"");
    client.getScript().evalAsync(RScript.Mode.READ_ONLY, RedisScript.STOCK_SCRIPT, RScript.ReturnType.INTEGER, keys);
}


// 扣库存脚本
public static final String STOCK_SCRIPT =
        "if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then\n" +
                "local stock = tonumber(redis.call('hget', KEYS[1], KEYS[2]));\n" +
                "if (stock > 0) then\n" +
                " redis.call('hincrby', KEYS[1], KEYS[2], -1);\n" +
                "return stock;\n" +
                "end;\n" +
                "return 0;\n" +
                "end;";

复制代码



原文地址:访问原文地址
快照地址: 访问文章快照