Detailed explanation of redis advanced client lettue

premise

Lattice is a redis Java driver package. When I first met her, I encountered some problems when using redistemplate. Debug some source codes at the bottom. I found that the spring data redis driver package was replaced with lattice after a certain version. Lettuce translates as lettuce. Yes, it's the kind of lettuce you eat, so its logo looks like this:

Since lettue can be recognized by spring ecology, it must be outstanding, so the author spent time reading her official documents, sorting out test examples and writing this article. The version used in writing this article is lettue 5.1 8.RELEASE,SpringBoot 2.1. 8.RELEASE,JDK [8,11]。 Super long warning: This article took two weeks to complete intermittently, more than 40000 words

Introduction to lettuce

Lettue is a high-performance redis driven framework based on Java. The bottom layer integrates project reactor to provide natural reactive programming. The communication framework integrates netty and uses non blocking io. 5 Jdk1.0 is integrated after the X version The asynchronous programming feature of 8 provides a very rich and easy-to-use API while ensuring high performance. The new features of version 5.1 are as follows:

Note: redis version needs at least 2.6. Of course, the higher the better. The API compatibility is relatively strong.

You just need to introduce a single dependency to start using lettuce happily:

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.1.8.RELEASE</version>
</dependency>
dependencies {
  compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}

Connect to redis

Connecting to redis in stand-alone, sentinel and cluster modes requires a unified standard to represent the connection details. In lettue, this unified standard is redisuri. There are three ways to construct a redisuri instance:

RedisURI uri = RedisURI.create("redis://localhost/");
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
RedisURI uri = new RedisURI("localhost",6379,60,TimeUnit.SECONDS);

Custom connection URI syntax

格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s
简单:redis://localhost
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s
简单:rediss://localhost
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster

Timeout unit:

I personally recommend using the builder provided by redisuri. After all, although the customized URI is concise, it is prone to human errors. Since the author does not have the use scenario of SSL and UNIX domain socket, these two connection methods are not listed below.

Basic use

Lettue relies on four main components when it is used:

A basic use example is as follows:

@Test
public void testSetGet() throws Exception {
    RedisURI redisUri = RedisURI.builder()                    // <1> 创建单机连接的连接信息
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10,ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);   // <2> 创建客户端
    StatefulRedisConnection<String,String> connection = redisClient.connect();     // <3> 创建线程安全的连接
    RedisCommands<String,String> redisCommands = connection.sync();                // <4> 创建同步命令
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    String result = redisCommands.set("name","throwable",setArgs);
    Assertions.assertThat(result).isEqualToIgnoringCase("OK");
    result = redisCommands.get("name");
    Assertions.assertThat(result).isEqualTo("throwable");
    // ... 其他操作
    connection.close();   // <5> 关闭连接
    redisClient.shutdown();  // <6> 关闭客户端
}

be careful:

API

Lettue mainly provides three APIs:

Prepare a stand-alone redis connection for standby:

private static StatefulRedisConnection<String,String> CONNECTION;
private static RedisClient CLIENT;

@BeforeClass
public static void beforeClass() {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10,ChronoUnit.SECONDS))
            .build();
    CLIENT = RedisClient.create(redisUri);
    CONNECTION = CLIENT.connect();
}

@AfterClass
public static void afterClass() throws Exception {
    CONNECTION.close();
    CLIENT.shutdown();
}

The specific implementation of redis command API can be obtained directly from statefulredisconnection instance. See its interface definition:

public interface StatefulRedisConnection<K,V> extends StatefulConnection<K,V> {

    boolean isMulti();

    RedisCommands<K,V> sync();

    RedisAsyncCommands<K,V> async();

    RedisReactiveCommands<K,V> reactive();
}    

It is worth noting that, without specifying the codec rediscodec, the statefulredisconnection instance created by redisclient is generally a generic instance statefulredisconnection < string, string >, that is, the keys and values of all command APIs are of string type, which can meet most usage scenarios. Of course, you can customize the codec rediscodec < K, V > when necessary.

Synchronization API

Build rediscommands instance first:

private static RedisCommands<String,String> COMMAND;

@BeforeClass
public static void beforeClass() {
    COMMAND = CONNECTION.sync();
}

Basic usage:

@Test
public void testSyncPing() throws Exception {
   String pong = COMMAND.ping();
   Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}


@Test
public void testSyncSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    COMMAND.set("name",setArgs);
    String value = COMMAND.get("name");
    log.info("Get value: {}",value);
}

// Get value: throwable

The synchronization API returns results immediately after all command calls. If you are familiar with jedis, the usage of rediscommands is not much different from it.

Asynchronous API

First build the redisasynccommands instance:

private static RedisAsyncCommands<String,String> ASYNC_COMMAND;

@BeforeClass
public static void beforeClass() {
    ASYNC_COMMAND = CONNECTION.async();
}

Basic usage:

@Test
public void testAsyncPing() throws Exception {
    RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
    log.info("Ping result:{}",redisFuture.get());
}
// Ping result:PONG

The returned results of the implementation of all redisasynccommands methods are redisfuture instances, and the definition of redisfuture interface is as follows:

public interface RedisFuture<V> extends CompletionStage<V>,Future<V> {

    String getError();

    boolean await(long timeout,TimeUnit unit) throws InterruptedException;
}    

That is, redisfuture can seamlessly use future or jdk1 The method provided by completable future introduced in 8. for instance:

@Test
public void testAsyncSetAndGet1() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    RedisFuture<String> future = ASYNC_COMMAND.set("name",setArgs);
    // CompletableFuture#thenAccept()
    future.thenAccept(value -> log.info("Set命令返回:{}",value));
    // Future#get()
    future.get();
}
// Set命令返回:OK

@Test
public void testAsyncSetAndGet2() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    CompletableFuture<Void> result =
            (CompletableFuture<Void>) ASYNC_COMMAND.set("name",setArgs)
                    .thenAcceptBoth(ASYNC_COMMAND.get("name"),(s,g) -> {
                                log.info("Set命令返回:{}",s);
                                log.info("Get命令返回:{}",g);
                            });
    result.get();
}
// Set命令返回:OK
// Get命令返回:throwable

If you can skillfully use completable future and functional programming skills, you can combine multiple redisfuture to complete some columns of complex operations.

Reactive API

The reactive programming framework introduced by lettue is project reactor. If you don't have reactive programming experience, you can learn about project reactor first.

Build redisreactivecommands instance:

private static RedisReactiveCommands<String,String> REACTIVE_COMMAND;

@BeforeClass
public static void beforeClass() {
    REACTIVE_COMMAND = CONNECTION.reactive();
}

According to the method of project reactor and redisreactivecommands, if the returned result contains only 0 or 1 elements, the return value type is mono. If the returned result contains 0 to n (n is greater than 0), the return value is flux. For example:

@Test
public void testReactivePing() throws Exception {
    Mono<String> ping = REACTIVE_COMMAND.ping();
    ping.subscribe(v -> log.info("Ping result:{}",v));
    Thread.sleep(1000);
}
// Ping result:PONG

@Test
public void testReactiveSetAndGet() throws Exception {
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    REACTIVE_COMMAND.set("name",setArgs).block();
    REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}",value));
    Thread.sleep(1000);
}
// Get命令返回:throwable

@Test
public void testReactiveSet() throws Exception {
    REACTIVE_COMMAND.sadd("food","bread","meat","fish").block();
    Flux<String> flux = REACTIVE_COMMAND.smembers("food");
    flux.subscribe(log::info);
    REACTIVE_COMMAND.srem("food","fish").block();
    Thread.sleep(1000);
}
// meat
// bread
// fish

Take a more complex example, including transactions, function conversion, etc

@Test
public void testReactiveFunctional() throws Exception {
    REACTIVE_COMMAND.multi().doOnSuccess(r -> {
        REACTIVE_COMMAND.set("counter","1").doOnNext(log::info).subscribe();
        REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
    }).flatMap(s -> REACTIVE_COMMAND.exec())
            .doOnNext(transactionResult -> log.info("Discarded:{}",transactionResult.wasDiscarded()))
            .subscribe();
    Thread.sleep(1000);
}
// OK
// 2
// Discarded:false

This method starts a transaction. First set the counter to 1, and then increase the counter by 1.

Publish And Subscribe

The publish subscription in non cluster mode depends on the customized connection statefulredispubsubconnection. The publish subscription in cluster mode depends on the customized connection statefulredisclusterpubsubconnection. They come from redisclient#connectpubsub() series methods and redisclusterclient#connectpubsub():

// 可能是单机、普通主从、哨兵等非集群模式的客户端
RedisClient client = ...
StatefulRedisPubSubConnection<String,String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String,String>() { ... });

// 同步命令
RedisPubSubCommands<String,String> sync = connection.sync();
sync.subscribe("channel");

// 异步命令
RedisPubSubAsyncCommands<String,String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// 反应式命令
RedisPubSubReactiveCommands<String,String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
// 使用方式其实和非集群模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String,String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String,String>() { ... });
RedisPubSubCommands<String,String> sync = connection.sync();
sync.subscribe("channel");
// ...

Here, take the mode of single machine synchronization command as an example of redis keyspace Notifications:

@Test
public void testSyncKeyspaceNotification() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            // 注意这里只能是0号库
            .withDatabase(0)
            .withTimeout(Duration.of(10,ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    StatefulRedisConnection<String,String> redisConnection = redisClient.connect();
    RedisCommands<String,String> redisCommands = redisConnection.sync();
    // 只接收键过期的事件
    redisCommands.configSet("notify-keyspace-events","Ex");
    StatefulRedisPubSubConnection<String,String> connection = redisClient.connectPubSub();
    connection.addListener(new RedisPubSubAdapter<>() {

        @Override
        public void psubscribed(String pattern,long count) {
            log.info("pattern:{},count:{}",pattern,count);
        }

        @Override
        public void message(String pattern,String channel,String message) {
            log.info("pattern:{},channel:{},message:{}",channel,message);
        }
    });
    RedisPubSubCommands<String,String> commands = connection.sync();
    commands.psubscribe("__keyevent@0__:expired");
    redisCommands.setex("name",2,"throwable");
    Thread.sleep(10000);
    redisConnection.close();
    connection.close();
    redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

In fact, when implementing redispubsublistener, it can be separated separately. Try not to design it in the form of anonymous internal classes.

Transaction and batch command execution

The transaction related commands are watch, unwatch, exec, multi and discard. There are corresponding methods in the rediscommands series interface. for instance:

// 同步模式
@Test
public void testSyncMulti() throws Exception {
    COMMAND.multi();
    COMMAND.setex("name-1","throwable");
    COMMAND.setex("name-2","doge");
    TransactionResult result = COMMAND.exec();
    int index = 0;
    for (Object r : result) {
        log.info("Result-{}:{}",index,r);
        index++;
    }
}
// Result-0:OK
// Result-1:OK

The pipeline mechanism of redis can be understood as packaging multiple commands in one request and sending them to the redis server, and then the redis server packages all response results and returns them at one time, So as to save unnecessary network resources (mainly reducing the number of network requests). Redis has no clear provisions on how to implement the pipeline mechanism, nor does it provide special commands to support the pipeline mechanism. Bio is used at the bottom of jedis (blocking IO) communication, so its approach is that the client caches the commands to be sent, finally needs to trigger, and then synchronously sends a huge command list package, and then receives and parses a huge response list package. Pipeline is transparent to users in lettue. Because the underlying communication framework is netty, lettue does not need to do too much to optimize network communication In other words, it can be understood as follows: netty helped lattice implement redis's pipeline mechanism from the bottom. However, lettue's asynchronous API also provides a manual flush method:

@Test
public void testAsyncManualFlush() {
    // 取消自动flush
    ASYNC_COMMAND.setAutoFlushCommands(false);
    List<RedisFuture<?>> redisFutures = Lists.newArrayList();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        redisFutures.add(ASYNC_COMMAND.set(key,value));
        redisFutures.add(ASYNC_COMMAND.expire(key,2));
    }
    long start = System.currentTimeMillis();
    ASYNC_COMMAND.flushCommands();
    boolean result = LettuceFutures.awaitAll(10,TimeUnit.SECONDS,redisFutures.toArray(new RedisFuture[0]));
    Assertions.assertThat(result).isTrue();
    log.info("Lettuce cost:{} ms",System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms

The above is only some theoretical terms seen from the documents, but the reality is skinny. By comparing the methods provided by jedis pipeline, it is found that jedis pipeline takes less time to execute:

@Test
public void testJedisPipeline() throws Exception {
    Jedis jedis = new Jedis();
    Pipeline pipeline = jedis.pipelined();
    int count = 5000;
    for (int i = 0; i < count; i++) {
        String key = "key-" + (i + 1);
        String value = "value-" + (i + 1);
        pipeline.set(key,value);
        pipeline.expire(key,2);
    }
    long start = System.currentTimeMillis();
    pipeline.syncAndReturnAll();
    log.info("Jedis cost:{} ms",System.currentTimeMillis()  - start);
}
// Jedis cost:9 ms

Personally, I guess that lettue may not combine all commands at the bottom and send them at once (or even send a single command). Specifically, you may need to capture packets to locate them. From this point of view, if there are a large number of scenarios in which redis commands are executed, you might as well use jedis pipeline.

Note: it is inferred from the above test that the executepipelined() method of redistemplate is a fake pipeline execution method. Please pay attention to this when using redistemplate.

Lua script execution

The synchronization interface for executing the Lua command of redis in lattice is as follows:

public interface RedisScriptingCommands<K,V> {

    <T> T eval(String var1,ScriptOutputType var2,K... var3);

    <T> T eval(String var1,K[] var3,V... var4);

    <T> T evalsha(String var1,K... var3);

    <T> T evalsha(String var1,V... var4);

    List<Boolean> scriptExists(String... var1);

    String scriptFlush();

    String scriptKill();

    String scriptLoad(V var1);

    String digest(V var1);
}

The definitions of asynchronous and reactive interface methods are similar. The difference is the return value type. Generally, we often use Eval (), evalsha () and scriptload () methods. Take a simple example:

private static RedisCommands<String,String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
        "local value = ARGV[1]\n" +
        "local timeout = ARGV[2]\n" +
        "redis.call('SETEX',key,tonumber(timeout),value)\n" +
        "local result = redis.call('GET',key)\n" +
        "return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();

@Test
public void testLua() throws Exception {
    LUA_SHA.compareAndSet(null,COMMANDS.scriptLoad(RAW_LUA));
    String[] keys = new String[]{"name"};
    String[] args = new String[]{"throwable","5000"};
    String result = COMMANDS.evalsha(LUA_SHA.get(),ScriptOutputType.VALUE,keys,args);
    log.info("Get value:{}",result);
}
// Get value:throwable

High availability and fragmentation

For the high availability of redis, Ordinary master-slave is generally used (Master / replica, which I call the normal master-slave mode here, that is, only master-slave replication is performed, and manual switching is required for failure), sentinel and cluster. The normal master-slave mode can run independently or in cooperation with sentinel, but sentinel provides automatic failover and master node promotion functions. Both normal master-slave and sentinel can use masterslave, including redisc through input parameters Client, codec and one or more redisuris to obtain the corresponding connection instance.

Note that if the method provided in masterslave requires only one redisuri instance to be passed in, lattice will use the topology discovery mechanism to automatically obtain the redis master-slave node information; If it is required to pass in a redisuri collection, all node information is static for the normal master-slave mode and will not be discovered and updated.

The rules for topology discovery are as follows:

The API of topology discovery mechanism is topologyprovider. If you need to understand its principle, you can refer to the specific implementation.

For the cluster mode, lettuce provides a set of independent APIs.

In addition, if the lattice connection is for a non single redis node, the connection instance provides the data reading node preference (readfrom) setting. The optional values are:

General master-slave mode

Suppose that three redis services form a tree master-slave relationship, as follows:

For the first dynamic node discovery, the node information of the master-slave mode needs to be connected as follows:

@Test
public void testDynamicReplica() throws Exception {
    // 这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以
    RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisClient redisClient = RedisClient.create(uri);
    StatefulRedisMasterSlaveConnection<String,String> connection = MasterSlave.connect(redisClient,new Utf8StringCodec(),uri);
    // 只从从节点读取数据
    connection.setReadFrom(ReadFrom.SLAVE);
    // 执行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

If you need to specify the static redis master-slave node connection attribute, you can build the connection as follows:

@Test
public void testStaticReplica() throws Exception {
    List<RedisURI> uris = new ArrayList<>();
    RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
    RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
    RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
    uris.add(uri1);
    uris.add(uri2);
    uris.add(uri3);
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String,uris);
    // 只从主节点读取数据
    connection.setReadFrom(ReadFrom.MASTER);
    // 执行其他Redis命令
    connection.close();
    redisClient.shutdown();
}

Sentinel mode

Since lattice itself provides a sentinel topology discovery mechanism, you only need to configure a redisuri instance of a sentinel node:

@Test
public void testDynamicSentinel() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withPassword("你的密码")
            .withSentinel("localhost",26379)
            .withSentinelMasterId("哨兵Master的ID")
            .build();
    RedisClient redisClient = RedisClient.create();
    StatefulRedisMasterSlaveConnection<String,redisUri);
    // 只允许从从节点读取数据
    connection.setReadFrom(ReadFrom.SLAVE);
    RedisCommands<String,String> command = connection.sync();
    SetArgs setArgs = SetArgs.Builder.nx().ex(5);
    command.set("name",setArgs);
    String value = command.get("name");
    log.info("Get value:{}",value);
}
// Get value:throwable

Cluster mode

Since the author is not familiar with redis cluster mode, there are many restrictions on the use of APIs in cluster mode, so here is a brief introduction to how to use them. Let's start with a few features:

The following API provides the function of calling across slots:

Static node selection function:

Cluster topology view dynamic update function:

Refer to the official documents for the detailed process of setting up redis clusters. It is assumed that the clusters have been set up as follows (192.168.56.200 is the author's virtual machine host):

Simple cluster connection and usage are as follows:

@Test
public void testSyncCluster(){
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String,String> connection = redisClusterClient.connect();
    RedisAdvancedClusterCommands<String,String> commands = connection.sync();
    commands.setex("name",10,"throwable");
    String value = commands.get("name");
    log.info("Get value:{}",value);
}
// Get value:throwable

Node selection:

@Test
public void testSyncNodeSelection() {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String,String> commands = connection.sync();
//  commands.all();  // 所有节点
//  commands.masters();  // 主节点
    // 从节点只读
    NodeSelection<String,String> replicas = commands.slaves();
    NodeSelectionCommands<String,String> nodeSelectionCommands = replicas.commands();
    // 这里只是演示,一般应该禁用keys *命令
    Executions<List<String>> keys = nodeSelectionCommands.keys("*");
    keys.forEach(key -> log.info("key: {}",key));
    connection.close();
    redisClusterClient.shutdown();
}

Update the cluster topology view regularly (every ten minutes. This time should be considered by yourself and should not be too frequent):

@Test
public void testPeriodicClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
            .builder()
            .enablePeriodicRefresh(Duration.of(10,ChronoUnit.MINUTES))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String,value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

Adaptively update the cluster topology view:

@Test
public void testAdaptiveClusterTopology() throws Exception {
    RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
    RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
    ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
            .enableAdaptiveRefreshTrigger(
                    ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
            )
            .adaptiveRefreshTriggersTimeout(Duration.of(30,ChronoUnit.SECONDS))
            .build();
    redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
    StatefulRedisClusterConnection<String,value);
    Thread.sleep(Integer.MAX_VALUE);
    connection.close();
    redisClusterClient.shutdown();
}

Dynamic and custom commands

Custom commands are a limited set of redis commands, but you can specify key, argv, command type, codec and return value type in a finer granularity, depending on the dispatch() method:

// 自定义实现PING方法
@Test
public void testCustomPing() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10,String> connect = redisClient.connect();
    RedisCommands<String,String> sync = connect.sync();
    RedisCodec<String,String> codec = StringCodec.UTF8;
    String result = sync.dispatch(CommandType.PING,new StatusOutput<>(codec));
    log.info("PING:{}",result);
    connect.close();
    redisClient.shutdown();
}
// PING:PONG

// 自定义实现Set方法
@Test
public void testCustomSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10,String> codec = StringCodec.UTF8;
    sync.dispatch(CommandType.SETEX,new StatusOutput<>(codec),new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
    String result = sync.get("name");
    log.info("Get value:{}",result);
    connect.close();
    redisClient.shutdown();
}
// Get value:throwable

Dynamic commands are based on the limited set of redis commands, and some complex command combinations are realized through annotations and dynamic agents. The main comments are in io lettuce. core. dynamic. Annotation package path. For example:

public interface CustomCommand extends Commands {

    // SET [key] [value]
    @Command("SET ?0 ?1")
    String setKey(String key,String value);

    // SET [key] [value]
    @Command("SET :key :value")
    String setKeyNamed(@Param("key") String key,@Param("value") String value);

    // MGET [key1] [key2]
    @Command("MGET ?0 ?1")
    List<String> mGet(String key1,String key2);
    /**
     * 方法名作为命令
     */
    @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
    String mSet(String key1,String value1,String key2,String value2);
}


@Test
public void testCustomDynamicSet() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10,String> connect = redisClient.connect();
    RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
    CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
    commands.setKey("name","throwable");
    commands.setKeyNamed("throwable","doge");
    log.info("MGET ===> " + commands.mGet("name","throwable"));
    commands.mSet("key1","value1","key2","value2");
    log.info("MGET ===> " + commands.mGet("key1","key2"));
    connect.close();
    redisClient.shutdown();
}
// MGET ===> [throwable,doge]
// MGET ===> [value1,value2]

Higher order characteristics

Lettue has many high-level use features. Here are only two points that I think are commonly used:

More other features can be found in the official documentation.

Configure client resources

The setting of client resources is related to lettue's performance, concurrency and event processing. Thread pool or thread group related configurations account for most of the client resource configuration (eventloopgroups and eventexecutorgroups), these thread pools or thread groups are the basic components of the connection program. Generally, the client resources should be shared among multiple redis clients and should be closed when they are no longer in use. The author believes that the client resources are netty oriented. Note: unless you are particularly familiar with or spend a long time testing and adjusting Otherwise, if you change the default value intuitively without experience, you may step on the pit.

The client resource interface is clientresources, and the implementation class is defaultclientresources.

Build defaultclientresources instance:

// 默认
ClientResources resources = DefaultClientResources.create();

// 建造器
ClientResources resources = DefaultClientResources.builder()
                        .ioThreadPoolSize(4)
                        .computationThreadPoolSize(4)
                        .build()

use:

ClientResources resources = DefaultClientResources.create();
// 非集群
RedisClient client = RedisClient.create(resources,uri);
// 集群
RedisClusterClient clusterClient = RedisClusterClient.create(resources,uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 关闭资源
resources.shutdown();

Basic configuration of client resources:

Advanced configuration of client resources:

Attribute configuration of non cluster client redisclient:

Redis non cluster client redisclient itself provides the method of configuring attributes:

RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
                       .autoReconnect(false)
                       .pingBeforeActivateConnection(true)
                       .build());

List of configuration properties for non clustered clients:

Cluster client attribute configuration:

Redis cluster client redisclusterclient itself provides methods for configuring properties:

RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                .enablePeriodicRefresh(refreshPeriod(10,TimeUnit.MINUTES))
                .enableAllAdaptiveRefreshTriggers()
                .build();

client.setOptions(ClusterClientOptions.builder()
                       .topologyRefreshOptions(topologyRefreshOptions)
                       .build());

List of configuration properties of cluster client:

Use connection pool

Introducing connection pool dependency commons-pool2:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.7.0</version>
</dependency

The basic usage is as follows:

@Test
public void testUseConnectionPool() throws Exception {
    RedisURI redisUri = RedisURI.builder()
            .withHost("localhost")
            .withPort(6379)
            .withTimeout(Duration.of(10,ChronoUnit.SECONDS))
            .build();
    RedisClient redisClient = RedisClient.create(redisUri);
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    GenericObjectPool<StatefulRedisConnection<String,String>> pool
            = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect,poolConfig);
    try (StatefulRedisConnection<String,String> connection = pool.borrowObject()) {
        RedisCommands<String,String> command = connection.sync();
        SetArgs setArgs = SetArgs.Builder.nx().ex(5);
        command.set("name",setArgs);
        String n = command.get("name");
        log.info("Get value:{}",n);
    }
    pool.close();
    redisClient.shutdown();
}

Among them, connectionpoolsupport is required for the pooling support of synchronous connections, and asyncconnectionpoolsupport is required for the pooling support of asynchronous connections (only supported after lettue5.1).

Several common examples of progressive deletion

Gradually delete domain - attributes in hash:

@Test
public void testDelBigHashKey() throws Exception {
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目标KEY
    String key = "BIG_HASH_KEY";
    prepareHashTestData(key);
    log.info("开始渐进式删除Hash的元素...");
    int counter = 0;
    do {
        MapScanCursor<String,String> result = COMMAND.hscan(key,cursor,scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        Collection<String> fields = result.getMap().values();
        if (!fields.isEmpty()) {
            COMMAND.hdel(key,fields.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...",counter);
}

private void prepareHashTestData(String key) throws Exception {
    COMMAND.hset(key,"1","1");
    COMMAND.hset(key,"2","2");
    COMMAND.hset(key,"3","3");
    COMMAND.hset(key,"4","4");
    COMMAND.hset(key,"5","5");
}

Progressive deletion of elements in the collection:

@Test
public void testDelBigSetKey() throws Exception {
    String key = "BIG_SET_KEY";
    prepareSetTestData(key);
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    log.info("开始渐进式删除Set的元素...");
    int counter = 0;
    do {
        ValueScanCursor<String> result = COMMAND.sscan(key,scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<String> values = result.getValues();
        if (!values.isEmpty()) {
            COMMAND.srem(key,values.toArray(new String[0]));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除Set的元素完毕,counter);
}

private void prepareSetTestData(String key) throws Exception {
    COMMAND.sadd(key,"5");
}

Progressive deletion of elements in an ordered collection:

@Test
public void testDelBigZSetKey() throws Exception {
    // SCAN参数
    ScanArgs scanArgs = ScanArgs.Builder.limit(2);
    // TEMP游标
    ScanCursor cursor = ScanCursor.INITIAL;
    // 目标KEY
    String key = "BIG_ZSET_KEY";
    prepareZSetTestData(key);
    log.info("开始渐进式删除ZSet的元素...");
    int counter = 0;
    do {
        scoredValueScanCursor<String> result = COMMAND.zscan(key,scanArgs);
        // 重置TEMP游标
        cursor = ScanCursor.of(result.getCursor());
        cursor.setFinished(result.isFinished());
        List<scoredValue<String>> scoredValues = result.getValues();
        if (!scoredValues.isEmpty()) {
            COMMAND.zrem(key,scoredValues.stream().map(scoredValue<String>::getValue).toArray(String[]::new));
        }
        counter++;
    } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
    log.info("渐进式删除ZSet的元素完毕,counter);
}

private void prepareZSetTestData(String key) throws Exception {
    COMMAND.zadd(key,"1");
    COMMAND.zadd(key,"2");
    COMMAND.zadd(key,"3");
    COMMAND.zadd(key,"4");
    COMMAND.zadd(key,"5");
}

Using lettuce in springboot

Personally, I don't think the API encapsulation in spring data redis is very good. It's heavy to use and not flexible enough. Here, combined with the previous examples and code, configure and integrate lettue in the springboot scaffold project. Import dependency first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.8.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
            <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>5.1.8.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
</dependencies>        

Generally, each application should use a single redis client instance and a single connection instance. Here, a scaffold is designed to adapt to four use scenarios: single machine, ordinary master-slave, sentry and cluster. For client resources, the default implementation can be adopted. For redis connection properties, the main ones are host, port and password. Others can be ignored temporarily. Based on the principle that convention is greater than configuration, customize a series of attribute configuration classes first (in fact, some configurations can be completely shared, but considering the relationship between classes to be clearly described, multiple configuration attribute classes and multiple configuration methods are split here):

@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {

    private LettuceSingleProperties single;
    private LettuceReplicaProperties replica;
    private LettuceSentinelProperties sentinel;
    private LettuceClusterProperties cluster;

}

@Data
public class LettuceSingleProperties {

    private String host;
    private Integer port;
    private String password;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {

}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {

    private String masterId;
}

@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {

}

The configuration classes are as follows, mainly using @ conditionalonproperty for isolation. Generally, few people will use more than one redis connection scenario in an application:

@requiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {

    private final LettuceProperties lettuceProperties;

    @Bean(destroyMethod = "shutdown")
    public ClientResources clientResources() {
        return DefaultClientResources.create();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisURI singleRedisUri() {
        LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
        return RedisURI.builder()
                .withHost(singleProperties.getHost())
                .withPort(singleProperties.getPort())
                .withPassword(singleProperties.getpassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public RedisClient singleRedisClient(ClientResources clientResources,@Qualifier("singleRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources,redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.single.host")
    public StatefulRedisConnection<String,String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
        return singleRedisClient.connect();
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisURI replicaRedisUri() {
        LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
        return RedisURI.builder()
                .withHost(replicaProperties.getHost())
                .withPort(replicaProperties.getPort())
                .withPassword(replicaProperties.getpassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public RedisClient replicaRedisClient(ClientResources clientResources,@Qualifier("replicaRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources,redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.replica.host")
    public StatefulRedisMasterSlaveConnection<String,String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,@Qualifier("replicaRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(replicaRedisClient,redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisURI sentinelRedisUri() {
        LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
        return RedisURI.builder()
                .withPassword(sentinelProperties.getpassword())
                .withSentinel(sentinelProperties.getHost(),sentinelProperties.getPort())
                .withSentinelMasterId(sentinelProperties.getMasterId())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public RedisClient sentinelRedisClient(ClientResources clientResources,@Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return RedisClient.create(clientResources,redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.sentinel.host")
    public StatefulRedisMasterSlaveConnection<String,String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,@Qualifier("sentinelRedisUri") RedisURI redisUri) {
        return MasterSlave.connect(sentinelRedisClient,redisUri);
    }

    @Bean
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisURI clusterRedisUri() {
        LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
        return RedisURI.builder()
                .withHost(clusterProperties.getHost())
                .withPort(clusterProperties.getPort())
                .withPassword(clusterProperties.getpassword())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "lettuce.cluster.host")
    public RedisClusterClient redisClusterClient(ClientResources clientResources,@Qualifier("clusterRedisUri") RedisURI redisUri) {
        return RedisClusterClient.create(clientResources,redisUri);
    }

    @Bean(destroyMethod = "close")
    @ConditionalOnProperty(name = "lettuce.cluster")
    public StatefulRedisClusterConnection<String,String> clusterConnection(RedisClusterClient clusterClient) {
        return clusterClient.connect();
    }
}

Finally, in order for the IDE to recognize our configuration, you can add ide affinity and add a file spring configuration metadata. In the / meta inf folder JSON, as follows:

{
  "properties": [
    {
      "name": "lettuce.single","type": "club.throwable.spring.lettuce.LettuceSingleProperties","description": "单机配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    },{
      "name": "lettuce.replica","type": "club.throwable.spring.lettuce.LettuceReplicaProperties","description": "主从配置",{
      "name": "lettuce.sentinel","type": "club.throwable.spring.lettuce.LettuceSentinelProperties","description": "哨兵配置",{
      "name": "lettuce.single","type": "club.throwable.spring.lettuce.LettuceClusterProperties","description": "集群配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"
    }
  ]
}

If you want to make ide affinity better, you can add / meta inf / additional spring configuration metadata JSON for more detailed definition. Simple use is as follows:

@Slf4j
@Component
public class RedisCommandLineRunner implements CommandLineRunner {

    @Autowired
    @Qualifier("singleRedisConnection")
    private StatefulRedisConnection<String,String> connection;

    @Override
    public void run(String... args) throws Exception {
        RedisCommands<String,String> redisCommands = connection.sync();
        redisCommands.setex("name",5,"throwable");
        log.info("Get value:{}",redisCommands.get("name"));
    }
}
// Get value:throwable

Summary

This paper is based on the official document of lettue, and makes a comprehensive analysis of its use, including some examples of main functions and configuration. Due to space limitation, some features and configuration details are not analyzed. Lattice has been accepted by spring data redis as the official redis client driver, so it is trustworthy. Some of its API designs are indeed reasonable, scalable and flexible. Personally, it's easy to add configurations to springboot applications based on the lattice package. After all, redistemplate is too cumbersome, and it also shields some advanced features and flexible APIs of lattice.

reference material:

link

(end of this article c-14-d e-a-20190928 too many things have happened recently...)

The official account of Technology (Throwable Digest), which is not regularly pushed to the original technical article (never copied or copied):

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>