作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
Adnan Kukuljac's profile image

Adnan Kukuljac

An expert in graphics, robotics, and backends, Adnan专注于用c++构建高性能解决方案, JS and many other languages.

Expertise

Years of Experience

11

Share

微服务架构是设计和实现高度可扩展web应用程序的一种非常流行的方法. 单体应用程序中组件之间的通信通常基于同一进程中的方法或函数调用. A microservices‑based application, on the other hand, is a distributed system running on multiple machines.

为了拥有稳定和可扩展的系统,这些微服务之间的通信非常重要. There are multiple ways to do this. 基于消息的通信是可靠地做到这一点的一种方法.

在使用消息传递时,组件通过异步交换消息来相互交互. Messages are exchanged through channels.

促进服务a和服务B之间通信的消息传递系统的图形表示

When Service A wants to communicate with Service B, instead of sending it directly, A sends it to a specific channel. 当服务B想要读取消息时,它从一个特定的消息通道中获取消息.

In this Spring Integration tutorial, 您将学习如何使用Redis在Spring应用程序中实现消息传递. 您将了解一个示例应用程序,其中一个服务正在向队列中推送事件,而另一个服务正在逐个处理这些事件.

Spring Integration

Spring Integration项目扩展了Spring框架,为基于Spring的应用程序之间或内部的消息传递提供支持. Components are wired together via the messaging paradigm. 单个组件可能不知道应用程序中的其他组件.

Spring Integration为与外部系统通信提供了广泛的机制选择. 通道适配器就是用于单向集成(发送或接收)的一种机制。. 网关用于请求/应答场景(入站或出站).

Apache Camel is an alternative that is widely used. 在现有的基于Spring的服务中,Spring集成通常是首选,因为它是Spring生态系统的一部分.

Redis

Redis is an extremely fast in-memory data store. It can optionally persist to a disk also. 它支持不同的数据结构,如简单的键值对、集合、队列等.

使用Redis作为队列使得组件之间的数据共享和水平扩展变得更加容易. 一个生产者或多个生产者可以将数据推送到队列, 一个或多个消费者可以提取数据并处理事件.

多个使用者不能使用相同的事件—这确保一个事件只处理一次.

diagram showing producer/consumer architecture

Benefits of using Redis as a message queue:

  • 以非阻塞方式并行执行离散任务
  • Great performance
  • Stability
  • Easy monitoring and debugging
  • Easy implementation and usage

Rules:

  • 将任务添加到队列中应该比处理任务本身更快.
  • 消费任务应该比生产任务快(如果不是,添加更多的消费者).

Spring Integration with Redis

下面通过创建一个示例应用程序来解释如何使用Spring Integration与Redis.

假设您有一个允许用户发布帖子的应用程序. And you want to build a follow feature. 另一个要求是,每当有人发布帖子时, 所有的关注者应该通过一些通信渠道(如.g., email or push notification).

实现这一点的一种方法是,一旦用户发布内容,就向每个关注者发送电子邮件. But what happens when the user has 1,000 followers? 当1000个用户在10秒内发布内容时,每个用户都有1000个关注者? 此外,出版商的帖子将等到所有的电子邮件发送?

Distributed systems resolve this problem.

This specific problem could be resolved by using a queue. 负责发布帖子的服务A(生产者)将负责这一工作. 它将发布一个帖子,并推送一个事件,其中包含需要接收电子邮件和帖子本身的用户列表. The list of users could be fetched in service B, but for simplicity of this example, we will send it from service A.

This is an asynchronous operation. 这意味着正在发布的服务将不必等待发送电子邮件.

服务B(消费者)将从队列中提取事件并处理它. 通过这种方式,我们可以很容易地扩展我们的服务,我们可以做到 n consumers sending emails (processing events).

因此,让我们从生产者服务中的实现开始. Necessary dependencies are:


   redis.clients
   jedis


   org.springframework.data
   spring-data-redis


   org.springframework.integration
   spring-integration-redis

These three Maven dependencies are necessary:

  • Jedis is a Redis client.
  • Spring Data Redis的依赖使得在Java中更容易使用Redis. 它提供了熟悉的Spring概念,例如用于核心API使用的模板类和轻量级存储库风格的数据访问.
  • Spring Integration Redis提供了Spring编程模型的扩展,以支持众所周知的 Enterprise Integration Patterns.

Next, we need to configure the Jedis client:

@Configuration
public class RedisConfig {

   @Value("${redis.host}")
   private String redisHost;

   @Value("${redis.port:6379}")
   private int redisPort;

   @Bean
   public JedisPoolConfig poolConfig() {
       JedisPoolConfig poolConfig = new JedisPoolConfig();
       poolConfig.setMaxTotal(128);
       return poolConfig;
   }

   @Bean
   RedisConnectionFactory (JedisPoolConfig)
       最后JedisConnectionFactory连接工厂=新的JedisConnectionFactory();
       connectionFactory.setHostName(redisHost);
       connectionFactory.setPort(redisPort);
       connectionFactory.setPoolConfig(poolConfig);
       connectionFactory.setUsePool(true);
       return connectionFactory;
   }
}

The annotation @Value 意味着Spring将把应用程序属性中定义的值注入到字段中. This means redis.host and redis.port values should be defined in the application properties.

现在,我们需要定义要发送到队列的消息. A simple example message could look like:

@Getter
@Setter
@Builder
public class PostPublishedEvent {
   
   private String postUrl;
   private String postTitle;    
   private List emails;

}

Note: Project Lombok (http://projectlombok.org/) provides the @Getter, @Setter, @Builder,以及许多其他注释,以避免与getter、setter和其他琐碎的东西混淆代码. You can learn more about it from this Toptal article.

消息本身将以JSON格式保存在队列中. 每次将事件发布到队列时,消息将被序列化为JSON. 当从队列中消费时,消息将被反序列化.

定义了消息后,我们需要定义队列本身. In Spring Integration, it can be easily done via an .xml configuration. The configuration should be placed inside the resources/WEB-INF directory.




   

   
       
   

   
   
   

   

   


在配置中,您可以看到“int-redis:queue-outbound-channel-adapter”部分.” Its properties are:

  • id: The bean name of the component.
  • channel: MessageChannel from which this endpoint receives messages.
  • connection-factory: A reference to a RedisConnectionFactory bean.
  • queue: Redis列表的名称,在该列表上执行基于队列的推送操作以发送Redis消息. 此属性与队列表达式互斥.
  • queue-expression: 一个SpEL表达式,用于在运行时使用传入消息来确定Redis列表的名称 #root variable. This attribute is mutually exclusive with the queue.
  • serializer: A RedisSerializer bean reference. By default, it is a JdkSerializationRedisSerializer. However, for String payloads, a StringRedisSerializer is used if a serializer reference isn’t provided.
  • extract-payload: 指定这个端点是否应该只发送有效负载到Redis队列或整个消息. Its default value is true.
  • left-push: Specify whether this endpoint should use left push (when true) or right push (when false) to write messages to the Redis list. If true, 当与默认Redis队列入站通道适配器一起使用时,Redis列表充当FIFO队列. Set to false 与从左弹出列表中读取的软件一起使用,或实现类似堆栈的消息顺序. Its default value is true.

下一个步骤是定义网关,这在 .xml configuration. For a gateway, we are using the RedisChannelGateway class from the org.toptal.queue package.

StringRedisSerializer is used to serialize message before saving in Redis. Also in the .xml configuration, we defined the gateway and set RedisChannelGateway as a gateway service. This means that the RedisChannelGateway bean could be injected into other beans. We defined the property default-request-channel 方法来提供每个方法的通道引用 @Gateway annotation. Class definition:

public interface RedisChannelGateway {
   void enqueue(PostPublishedEvent event);
}

要将此配置连接到我们的应用程序中,我们必须导入它. This is implemented in the SpringIntegrationConfig class.

@ImportResource("classpath:WEB-INF/event-queue-config.xml")
@AutoConfigureAfter(RedisConfig.class)
@Configuration
public class SpringIntegrationConfig {
}

@ImportResource annotation is used to import Spring .xml configuration files into @Configuration. And @AutoConfigureAfter 注释用于提示应该在其他指定的自动配置类之后应用自动配置.

现在我们将创建一个服务并实现该方法 enqueue events to the Redis queue.

public interface QueueService {

   void enqueue(PostPublishedEvent event);
}
@Service
public class RedisQueueService implements QueueService {

   private RedisChannelGateway channelGateway;

   @Autowired
   public RedisQueueService(RedisChannelGateway) {
       this.channelGateway = channelGateway;
   }

   @Override
   public void enqueue(PostPublishedEvent event) {
       channelGateway.enqueue(event);
   }
}

方法将消息发送到队列 enqueue method from QueueService.

Redis队列只是一个或多个生产者和消费者的列表. To publish a message to a queue, producers use the LPUSH Redis command. And if you monitor Redis (hint: type redis-cli monitor), you can see that the message is added to the queue:

“my-event-queue”“LPUSH{\“postUrl \”,\“测试\”,\“postTitle \”:\“测试\”,\“邮件\”:[\“测试”\]}”

Now, 我们需要创建一个消费者应用程序,它将从队列中提取这些事件并对其进行处理. 消费者服务需要与生产者服务相同的依赖关系.

Now we can reuse the PostPublishedEvent class to deserialize messages.

我们需要创建队列配置,同样,它必须放在 resources/WEB-INF directory. The content of the queue config is:




   

   

   
       
   

   

   

   
       
   


In the .xml configuration, int-redis:queue-inbound-channel-adapter can have the following properties:

  • id: The bean name of the component.
  • channel: The MessageChannel to which we send messages from this endpoint.
  • auto-startup: A SmartLifecycle 属性指定此端点是否应在应用程序上下文启动后自动启动. Its default value is true.
  • phase: A SmartLifecycle 属性指定将在哪个阶段启动此端点. Its default value is 0.
  • connection-factory: A reference to a RedisConnectionFactory bean.
  • queue: Redis列表的名称,基于队列的pop操作将在其上执行以获取Redis消息.
  • error-channel: The MessageChannel to which we will send ErrorMessages with Exceptions from the listening task of the Endpoint. By default, the underlying MessagePublishingErrorHandler uses the default errorChannel from the application context.
  • serializer: The RedisSerializer bean reference. It can be an empty string, which means no serializer. In this case, the raw byte[] 从入站Redis消息发送到通道作为 Message payload. By default, it is a JdkSerializationRedisSerializer.
  • receive-timeout: pop操作等待来自队列的Redis消息的超时(以毫秒为单位). Its default value is 1 second.
  • recovery-interval: 在弹出操作出现异常后,侦听器任务在重新启动侦听器任务之前应该休眠的时间(以毫秒为单位).
  • expect-message: 指定此端点是否期望来自Redis队列的数据包含整个消息. If this attribute is set to true, 序列化器不能是空字符串,因为消息需要某种形式的反序列化(默认是JDK序列化)。. Its default value is false.
  • task-executor: A reference to a Spring TaskExecutor (or standard JDK 1.5+ Executor) bean. It is used for the underlying listening task. By default, a SimpleAsyncTaskExecutor is used.
  • right-pop: 指定此端点是否应该使用正确的pop(当 true) or left pop (when false) to read messages from the Redis list. If true,当与默认Redis队列出站通道适配器一起使用时,Redis列表充当FIFO队列. Set to false 与用右压向列表写入的软件一起使用,或实现类似堆栈的消息顺序. Its default value is true.

The important part is the “service activator,,它定义了应该使用哪个服务和方法来处理事件.’

Also, the json-to-object-transformer 需要一个type属性以便将JSON转换为对象,上面设置为 type="com.toptal.integration.spring.model.PostPublishedEvent".

Again, to wire this config, we will need the SpringIntegrationConfig class, which can be the same as before. 最后,我们需要一个实际处理事件的服务.

public interface EventProcessingService {
   void process(PostPublishedEvent event);
}

@Service("RedisEventProcessingService")
RedisEventProcessingService实现EventProcessingService

   @Override
   public void process(PostPublishedEvent event) {
       // TODO: Send emails here, retry strategy, etc :)
   }

}

Once you run the application, you can see in Redis:

"BRPOP" "my-event-queue" "1"

Conclusion

With Spring Integration and Redis, 构建Spring微服务应用程序并不像通常那样令人生畏. 只需少量的配置和少量的样板代码, you can build the foundations of your microservice architecture in no time.

即使您不打算完全放弃当前的Spring项目并切换到新的体系结构, with the help of Redis, 使用队列获得巨大的性能改进非常简单.

Understanding the basics

  • What is microservices architecture?

    基于微服务的应用程序是运行在多台机器上的分布式系统. 系统中的每个服务通过向其他服务传递消息进行通信.

  • What is monolithic architecture in software?

    In a monolithic application, 所有组件都位于同一进程中,通信通常基于同一进程中的方法或函数调用.

Hire a Toptal expert on this topic.
Hire Now
Adnan Kukuljac's profile image
Adnan Kukuljac

Located in Berlin, Germany

Member since October 19, 2017

About the author

An expert in graphics, robotics, and backends, Adnan专注于用c++构建高性能解决方案, JS and many other languages.

Toptal作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

Expertise

Years of Experience

11

World-class articles, delivered weekly.

Subscription implies consent to our privacy policy

World-class articles, delivered weekly.

Subscription implies consent to our privacy policy

Toptal Developers

Join the Toptal® community.