消息队列

1.消息队列

消息队列模式

点对点

点对点模式(Point-to-Point):在点对点模式中,消息被发送者(生产者)发送到一个队列,然后被一个接收者(消费者)从队列中取出并处理。在这种模式下,即使有多个消费者,每个消息也只会被处理一次,因为一旦消息被消费,它就会从队列中移除。如果有多个消费者,那么他们通常会以竞争的方式来获取队列中的消息,这种方式也被称为“竞争消费者”模式。这种模式适用于任务分发的场景,例如,将大量的任务分发给一组工作线程进行处理。

发布订阅

发布/订阅模式(Publish/Subscribe):在发布/订阅模式中,消息被发送者(发布者)发送到一个主题(Topic),然后被所有订阅了该主题的接收者(订阅者)接收。在这种模式下,每个消息都会被所有的订阅者接收和处理,因此一个消息可以被多次处理。这种模式适用于需要广播消息到多个接收者的场景,例如,实时更新的股票价格信息,需要广播给所有订阅了该股票信息的用户。

单个消息可以被多个订阅者并发的获取和处理。一般来说,订阅有两种类型:

临时(ephemeral)订阅,这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。 持久(durable)订阅,这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。

临时订阅和持久订阅是消息队列中两种不同类型的订阅方式,主要区别在于订阅的生命周期和消息的处理方式。

临时订阅(Ephemeral Subscription):这种订阅只在消费者启动并运行的时候存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。这种订阅的实现通常依赖于消费者与消息系统的会话(session)。当消费者启动并连接到消息系统时,它会创建一个新的会话并在该会话中创建订阅。当消费者退出或断开连接时,会话结束,所有在该会话中创建的临时订阅也会被删除。在业务中,临时订阅通常用于处理不需要持久化的数据,例如实时的状态更新或临时的事件通知。临时订阅通常用于那些不需要持久化消息,或者消费者始终在线的场景,例如实时的聊天系统或游戏。

持久订阅(Durable Subscription):这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。持久订阅的实现通常需要消费者在创建订阅时提供一个唯一的订阅标识符。消息系统会使用这个标识符来持久化订阅的状态,包括订阅的主题和已经发送但尚未确认的消息。当消费者重新连接并使用相同的订阅标识符时,消息系统会恢复该订阅,并重新发送所有未确认的消息。在业务中,持久订阅通常用于处理需要持久化的数据,例如电子邮件或订单处理。在业务中,持久订阅通常用于那些需要保证消息不丢失,或者消费者可能会离线的场景

技术选型

吞吐率:不同的消息队列系统在吞吐量和延迟上有不同的表现。例如,如果的应用需要处理大量的消息,那么可能需要一个高吞吐量的消息队列,如Kafka。如果的应用需要实时处理消息,那么可能需要一个低延迟的消息队列,如RabbitMQ

可靠性:如果的应用不能容忍消息的丢失,那么需要一个支持持久化和事务的消息队列,如RabbitMQ和ActiveMQ。如果的应用可以容忍少量的消息丢失,那么可以选择一个提供“至少一次”或“最多一次”投递保证的消息队列,如Kafka

至少一次(At-Least-Once):这种语义保证每个消息至少被投递一次。这可能导致消息的重复投递,因为在某些情况下,如网络故障或消费者崩溃,消息系统可能无法确定消息是否已经被成功处理,所以它会选择重新投递消息。这种语义适用于不能容忍消息丢失的场景,但应用需要能够处理重复的消息。

最多一次(At-Most-Once):这种语义保证每个消息最多被投递一次。这可能导致消息的丢失,因为在某些情况下,如网络故障或消息系统崩溃,已经投递的消息可能无法被重新投递。这种语义适用于可以容忍消息丢失,但不能处理重复消息的场景。

恰好一次(Exactly-Once),这种语义保证每个消息恰好被投递一次,既不会丢失也不会重复。然而,实现这种语义通常需要复杂的协议和高昂的性能开销,因此在实践中很少使用。

如果的应用需要处理金融交易,那么可能需要一个提供至少一次或恰好一次语义的消息系统,如Kafka或RabbitMQ。如果的应用需要处理日志数据,那么可能可以接受最多一次语义,因为丢失少量的日志数据通常是可以接受的。

功能需求:不同的消息队列系统提供了不同的功能,如消息过滤、优先级队列、延迟队列等。需要根据的应用需求来选择合适的消息队列。

集成需求:如果的应用已经使用了某个技术栈,那么可能希望选择一个与之兼容的消息队列。例如,如果的应用使用了Spring框架,那么可能会选择RabbitMQ,因为Spring提供了对RabbitMQ的良好支持。

运维需求:不同的消息队列系统在运维上有不同的复杂度。例如,Kafka的运维相对复杂,需要专门的运维团队来维护。而RabbitMQ和ActiveMQ的运维相对简单,适合小团队使用。

成本需求:需要考虑消息队列系统的总体成本,包括硬件成本、软件成本、运维成本等。例如,如果的应用部署在云上,那么可能会选择一个云服务提供商提供的消息队列服务,如Amazon SQS或Google Pub/Sub,因为这样可以降低运维成本

Kafka

Kafka:Apache Kafka它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。号称大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。

优点:

高吞吐量:Kafka设计用于处理大量的实时数据,可以处理数百万条消息/秒。 分布式系统:Kafka集群可以横向扩展,增加更多的节点以处理更多的流量。 持久性:Kafka可以将消息存储在磁盘上,以便在系统崩溃后恢复。 实时处理:Kafka支持实时数据流处理。

缺点:

配置和管理复杂:Kafka的配置参数众多,需要一定的学习成本。同时,Kafka集群的管理也相对复杂。 消息顺序保证:Kafka只能保证同一个分区(Partition)内的消息顺序。

适用场景:大数据处理,实时数据流处理,日志收集等。

RabbitMQ

RabbitMQ:RabbitMQ 2007年发布,是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

优点:

易用性:RabbitMQ的安装和管理相对简单,社区活跃,文档丰富。 灵活的路由:RabbitMQ提供了多种消息路由模式,如直接交换、主题交换、头交换等。 支持多种协议:RabbitMQ支持多种消息队列协议,如AMQP、STOMP、MQTT等。

缺点: 吞吐量相对较低:相比Kafka,RabbitMQ的吞吐量相对较低。 消息堆积:如果消息堆积过多,可能会影响RabbitMQ的性能。

适用场景:复杂的路由场景,实时消息传递,任务队列等。

RocketMQ

阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

优点:

高吞吐量和低延迟:RocketMQ设计用于处理大量的实时数据,同时保证低延迟。 强一致性:RocketMQ支持严格的消息顺序保证。 分布式系统:RocketMQ集群可以横向扩展,增加更多的节点以处理更多的流量。 缺点:

社区活跃度相对较低:相比Kafka和RabbitMQ,RocketMQ的社区活跃度相对较低。

适用场景:金融交易,订单处理,实时数据流处理等。

ActiveMQ

是Apache出品,最流行的,能力强劲的开源消息总线。官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用,所以该消息队列也不是我们文章中重点讨论的内容。

优点: 易用性:ActiveMQ的安装和管理相对简单,社区活跃,文档丰富。 支持JMS:ActiveMQ是一个完全支持JMS(Java Message Service)的消息队列。 缺点:

吞吐量和延迟:相比Kafka和RocketMQ,ActiveMQ的吞吐量和延迟相对较差。 集群扩展性:ActiveMQ的集群扩展性相对较差,不如Kafka和RocketMQ。

适用场景:ActiveMQ适用于企业应用集成,例如,处理业务流程、分布式系统间的消息传递等。

选型总结

如果需要处理大量的实时数据,那么Kafka或RocketMQ可能是一个好选择。如果需要处理复杂的路由场景,那么RabbitMQ可能是一个好选择。如果的应用是基于Java并且需要一个支持JMS的消息队列,那么ActiveMQ可能是一个好选择。

基本概念

Kafka

基本概念:

Producer:消息生产者,负责产生消息并发送到Kafka。 Consumer:消息消费者,从Kafka中读取并处理消息。 Topic:消息的分类,生产者将消息发送到特定的Topic,消费者从特定的Topic中读取消息。 Partition:Topic的分区,每个Topic可以有一个或多个Partition,每个Partition是一个有序的消息队列。 Broker:Kafka服务器,一个Kafka集群由多个Broker组成。

如何使用:Kafka提供了Java API用于生产和消费消息。生产者使用KafkaProducer类发送消息,消费者使用KafkaConsumer类读取消息。也可以使用Kafka的命令行工具进行操作,例如创建Topic、查看Topic信息等。

系统架构:

一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

消息发送过程:

创建Producer:首先,生产者应用程序会创建一个Kafka Producer实例。在创建Producer时,需要提供一些配置参数,如Kafka服务器的地址、消息的序列化方式等。

发送消息:生产者通过调用Producer的send()方法来发送消息。在调用send()方法时,需要提供一个ProducerRecord对象,该对象包含了要发送的Topic、Partition和消息内容。如果没有指定Partition,Kafka会根据消息的Key或者轮询策略来选择一个Partition。

序列化:Kafka Producer会将ProducerRecord中的Key和Value进行序列化。序列化是将对象转换为字节流的过程,这样才能在网络中传输。序列化的方式可以在创建Producer时通过配置参数来指定。

分区:Kafka Producer会根据ProducerRecord中的Key和Topic的Partition策略来选择一个Partition。如果ProducerRecord中指定了Partition,那么就使用指定的Partition。如果没有指定,那么Kafka会根据Key来选择一个Partition。如果Key也没有指定,那么Kafka会轮询所有的Partition。

网络传输:Kafka Producer会将序列化后的消息通过网络发送到对应Partition的Leader Broker。Kafka使用TCP协议进行网络传输,保证了消息的可靠性。

确认:Kafka Broker收到消息后,会将消息写入到本地的日志文件中。然后,Broker会向Producer发送一个确认消息。Producer收到确认消息后,就知道消息已经成功发送。

在实际使用中,为了提高性能,Kafka Producer通常会使用批处理和异步发送的方式。也就是说,Producer会将多个消息打包成一个批次,然后异步地发送到Broker。这样可以减少网络请求的次数,提高消息的吞吐量。

RabbitMQ

基本概念:

Producer:消息生产者,负责产生消息并发送到RabbitMQ。 Consumer:消息消费者,从RabbitMQ中读取并处理消息。 Queue:消息队列,生产者将消息发送到Queue,消费者从Queue中读取消息。 Exchange:交换器,负责接收生产者发送的消息并根据路由规则将消息路由到一个或多个Queue。 如何使用:RabbitMQ提供了多种语言的客户端库,例如Java、Python、.NET等。可以使用这些库中的API来生产和消费消息。例如,在Java中,可以使用Channel类的basicPublish方法发送消息,使用basicConsume方法读取消息。

消息发送过程:

创建连接:首先,生产者应用程序需要创建一个到RabbitMQ服务器的连接。这通常涉及指定RabbitMQ服务器的地址和认证信息。

创建通道:在RabbitMQ中,所有的操作都是在一个叫做”Channel”的概念中进行的。所以,生产者会创建一个Channel。

声明交换器:生产者声明一个交换器(Exchange),并指定交换器的类型(如direct, topic, fanout, headers)。交换器的作用是根据路由规则将消息路由到一个或多个队列。

声明队列:生产者声明一个队列,如果队列不存在,RabbitMQ会自动创建。队列是存储消息的地方。

绑定队列到交换器:生产者将队列绑定到交换器,并指定一个路由键(Routing Key)。路由键是交换器根据规则将消息路由到队列的依据。

发送消息:生产者通过调用basicPublish方法发送消息。在调用此方法时,需要提供交换器名称、路由键和消息内容。消息内容通常是字节流,所以可能需要将的对象序列化为字节流。

关闭通道和连接:发送完消息后,生产者需要关闭Channel和Connection。

在实际使用中,为了提高性能,可能需要使用一些高级特性,如消息确认、事务、发布确认等。

RocketMQ

基本概念:

Producer:消息生产者,负责产生消息并发送到RocketMQ。 Consumer:消息消费者,从RocketMQ中读取并处理消息。 Topic:消息的分类,生产者将消息发送到特定的Topic,消费者从特定的Topic中读取消息。 Broker:RocketMQ服务器,一个RocketMQ集群由多个Broker组成。 如何使用:RocketMQ提供了Java API用于生产和消费消息。生产者使用DefaultMQProducer类发送消息,消费者使用DefaultMQPushConsumer或DefaultMQPullConsumer类读取消息。

消息发送过程:

创建Producer:首先,生产者应用程序会创建一个RocketMQ Producer实例。在创建Producer时,需要提供一些配置参数,如RocketMQ服务器的地址、消息的序列化方式等。

启动Producer:调用Producer的start()方法启动Producer。这个步骤会建立到RocketMQ服务器的网络连接。

发送消息:生产者通过调用Producer的send()方法来发送消息。在调用send()方法时,需要提供一个Message对象,该对象包含了要发送的Topic、Tag和消息内容。

Broker选择:RocketMQ Producer会根据Topic选择一个合适的Broker来发送消息。选择Broker的策略可以在创建Producer时通过配置参数来指定。

网络传输:RocketMQ Producer会将消息通过网络发送到选择的Broker。RocketMQ使用TCP协议进行网络传输,保证了消息的可靠性。

确认:RocketMQ Broker收到消息后,会将消息写入到本地的日志文件中。然后,Broker会向Producer发送一个确认消息。Producer收到确认消息后,就知道消息已经成功发送。

关闭Producer:发送完消息后,生产者需要关闭Producer。这个步骤会关闭到RocketMQ服务器的网络连接。

在实际使用中,为了提高性能,RocketMQ Producer通常会使用批处理和异步发送的方式。也就是说,Producer会将多个消息打包成一个批次,然后异步地发送到Broker。这样可以减少网络请求的次数,提高消息的吞吐量

ActiveMQ

基本概念:

Producer:消息生产者,负责产生消息并发送到ActiveMQ。 Consumer:消息消费者,从ActiveMQ中读取并处理消息。 Queue/Topic:ActiveMQ支持两种消息模型,点对点模型(Queue)和发布订阅模型(Topic)。 如何使用:ActiveMQ提供了JMS(Java Message Service)API用于生产和消费消息。可以使用MessageProducer类发送消息,使用MessageConsumer类读取消息。

消息发送过程:

创建连接工厂:首先,生产者应用程序需要创建一个到ActiveMQ服务器的连接工厂(ConnectionFactory)。这通常涉及指定ActiveMQ服务器的地址和认证信息。

创建连接:使用连接工厂创建一个到ActiveMQ的连接(Connection)。

创建会话:在连接上创建一个会话(Session)。会话是发送和接收消息的上下文。

创建目的地:在会话上创建一个目的地(Destination)。目的地可以是队列(Queue)或主题(Topic),取决于使用的是点对点模型还是发布订阅模型。

创建生产者:在会话上创建一个消息生产者(MessageProducer)。生产者用于发送消息到目的地。

创建消息:创建一个消息(Message)。消息可以是文本消息(TextMessage)、字节消息(BytesMessage)、对象消息(ObjectMessage)等,取决于需要发送的数据类型。

发送消息:生产者调用send()方法发送消息到目的地。

关闭资源:发送完消息后,生产者需要关闭消息、生产者、会话和连接。

在实际使用中,为了提高性能,可能需要使用一些高级特性,如消息持久性、消息优先级、消息过期等。

Tags: 消息队列
Share: X (Twitter) Facebook LinkedIn