900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > @transaction 提交事务_Kafka 事务实现原理

@transaction 提交事务_Kafka 事务实现原理

时间:2019-07-31 16:22:26

相关推荐

@transaction 提交事务_Kafka 事务实现原理

Kafka 事务实现原理

Kafka 事务在流处理中应用很广泛,比如原子性的读取消息,立即处理和发送,如果中途出现错误,支持回滚操作。这篇文章来讲讲事务是如何实现的,首先来看看事务流程图。

事务流程

Kafka的整个事务处理流程如下图:

上图中的 Transaction Coordinator 运行在 Kafka 服务端,下面简称 TC 服务。

__transaction_state 是 TC 服务持久化事务信息的 topic 名称,下面简称事务 topic。

Producer 向 TC 服务发送的 commit 消息,下面简称事务提交消息。

TC 服务向分区发送的消息,下面简称事务结果消息。

寻找 TC 服务地址

Producer 会首先从 Kafka 集群中选择任意一台机器,然后向其发送请求,获取 TC 服务的地址。Kafka 有个特殊的事务 topic,名称为__transaction_state ,负责持久化事务消息。这个 topic 有多个分区,默认有50个,每个分区负责一部分事务。事务划分是根据 transaction id, 计算出该事务属于哪个分区。这个分区的 leader 所在的机器,负责这个事务的TC 服务地址。

事务初始化

Producer 在使用事务功能,必须先自定义一个唯一的 transaction id。有了 transaction id,即使客户端挂掉了,它重启后也能继续处理未完成的事务。

Kafka 实现事务需要依靠幂等性,而幂等性需要指定 producer id 。所以Producer在启动事务之前,需要向 TC 服务申请 producer id。TC 服务在分配 producer id 后,会将它持久化到事务 topic。

发送消息

Producer 在接收到 producer id 后,就可以正常的发送消息了。不过发送消息之前,需要先将这些消息的分区地址,上传到 TC 服务。TC 服务会将这些分区地址持久化到事务 topic。然后 Producer 才会真正的发送消息,这些消息与普通消息不同,它们会有一个字段,表示自身是事务消息。

这里需要注意下一种特殊的请求,提交消费位置请求,用于原子性的从某个 topic 读取消息,并且发送消息到另外一个 topic。我们知道一般是消费者使用消费组订阅 topic,才会发送提交消费位置的请求,而这里是由 Producer 发送的。Producer 首先会发送一条请求,里面会包含这个消费组对应的分区(每个消费组的消费位置都保存在 __consumer_offset topic 的一个分区里),TC 服务会将分区持久化之后,发送响应。Producer 收到响应后,就会直接发送消费位置请求给 GroupCoordinator。

发送提交请求

Producer 发送完消息后,如果认为该事务可以提交了,就会发送提交请求到 TC 服务。Producer 的工作至此就完成了,接下来它只需要等待响应。这里需要强调下,Producer 会在发送事务提交请求之前,会等待之前所有的请求都已经发送并且响应成功。

提交请求持久化

TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每 个分区生成提交请求,存到队列里等待发送。

读者可能有所疑问,在一般的二阶段提交中,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。那如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了,那么 Kafka 是如何保证事务完成。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。

发送事务结果信息给分区

后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。

客户端原理

使用示例

下面代码实现,消费者读取消息,并且发送到多个分区的事务

运行原理

上面的例子使用了 Producer的接口实现了事务,但负责与 TC 服务通信的是 TransactionManager 类。TransactionManager 类会发送申请分配 producer id 请求,上传消息分区请求和事务提交请求,在完成每一步请求,TransactionManager 都会更新自身的状态。

状态

这里还有两个状态没有列出来 ABORTABLE_ERROR或FATAL_ERROR,这是当请求出错后,状态就会变为它们。

服务端原理

TC 服务会为每个 transaction id 都维护了元数据,元数据的字段如下:

对于服务端,每个事务也有对应的状态

当 TC 服务接收到了来自客户端的分区上传请求,此时它才会认为此次事务开始了,然后它会更新分区列表,更新此次的事务开始时间为当前时间,并且会将更新后的元数据,持久化到事务 topic。最后将自身状态改为 Ongoing。

当TC 服务收到事务提交请求或者事务回滚请求,更新元数据,持久化到事务 topic,然后自身状态改为CompleteCommit 或CompleteAbort 。然后向涉及到该事务的分区发送事务结果消息,等待所有的分区都成功返回响应后,就会持久化一条事务成功的消息到消息 topic。

高可用分析

TC 服务

通过上述对 Kafka 事务的简述,可以看到 TC 服务起着很重要的作用。事实上 Kafka 集群中运行着多个 TC 服务,每个TC 服务负责事务 topic 的一个分区读写,也就是这个分区的 leader。Producer 根据 transaction id 的哈希值,来决定该事务属于事务 topic 的哪个分区,最后找到这个分区的 leader 位置。

既然 TC 服务负责事务 topic 的一个分区 leader,我们知道当一个分区的 leader挂掉之后,Kafka 会保证这个的分区的 follower 会转换为 leader 角色,会继续对外提供服务。这么 TC 服务的高可用就达到了。

消息持久化

TC 服务为了支持重启后,也能恢复到之前的状态,所以它将每次重要的消息都会持久化起来,并且保存到事务 topic 的时候,指定 leader 分区和 follower 分区必须都存储成功。这样每次 TC 服务启动的时候,都会从事务 topic 读取之前的状态,加载到缓存里。比如当TC 服务在响应客户端的事务提交请求后,还没来得及向各分区发送事务结果请求,就已经挂掉了。之后 TC 服务重启,会去事务 topic 加载数据,它发现事务的最后状态为 PrepareCommit,并且事务数据还包括了分区列表,这样 TC 服务会继续未完成的事务,会向列表中的各个分区发送事务结果请求。

超时处理

如果 Producer 发起了一个事务,但是由于网络问题,TC 服务迟迟没有接下来的请求,那么该事务就会被认为超时。TC 服务会有个线程,会定期检查处理 Ongoing 状态的事务,如果该事务的开始时间和当前时间的差,超过了指定的超时时间(在发送申请producer id请求时可以指定),那么 TC 服务就会回滚该事务,更新和持久化事务的状态,并且发送事务回滚结果给分区。

源码分析

如果对源码还有兴趣的读者,可以继续阅读这部分。这里会大概的讲解下代码结构,读者如果想进一步的理解,可以参看源码。整个事务的源码分为两部分,客户端和服务端。

客户端

事务的客户端,只能是 Producer。下面首先介绍下 Producer 与事务相关的接口。

KafkaProducer 类实现了 Producer 接口,比较简单,只是调用了 TransactionCoordinator 类的方法。客户端事务处理的核心代码,都是在 TransactionCoordinator 类里。

TransactionCoordinator 发送的请求类,都有一个对应的类来处理响应。这些处理类都是继承 TxnRequestHandler 类,它封装了共同的错误处理,比如连接断开,api 版本不兼容等。子类需要实现 handleResponse 方法,负责处理具体的响应内容。

initializeTransactions 方法负责事务初始化,它会发送 InitProducerIdRequest 请求。

InitProducerIdHandler 类的定义如下:

beginTransaction 方法负责开始新事务,它只是更改自身状态为 IN_TRANSACTION,并不会发送任何请求

我们知道Producer发送消息,都是先将消息发送到缓存队列里,最后是由Sender线程发送出去 。Producer 如果开启了事务, 它在发送消息到缓存之前,会将消息所在的分区保存在 TransactionCoordinator 里。然后Sender线程在发送消息之前,会去从 TransactionCoordinator 检查是否需要上次分区到 TC 服务,如果有就先上次分区,随后才发送消息。

TransactionManager 提供了 maybeAddPartitionToTransaction 方法添加分区。

TransactionManager 的 addPartitionsToTransactionHandler 方法,会生成分区上传请求,然后由Sender发送。

AddPartitionsToTxnHandler 负责处理响应

sendOffsetsToTransaction 方法负责发送消费位置提交请求

AddOffsetsToTxnHandler 类负责处理响应,它的处理逻辑很简单,它收到响应后,会发送 TxnOffsetCommitRequest 请求给 TC 服务。

最后还剩下事务提交或回滚请求,还没讲述。Producer 在调用 commitTransaction 或 abortTransaction 方法,本质都是调用了 TransactionManager 的 beginCompletingTransaction 方法发送请求。

EndTxnHandler 负责处理事务提交或回滚响应,EndTxnHandler的处理逻辑比较简单,它只是调用了 completeTransaction 方法。

服务端

服务端的结构会相对复杂一些,这里尽量简单的讲讲大概逻辑。首先介绍下 TransactionStateManager 类,它负责管理事务的元数据,它也提供持久化事务的元数据,和从事务 topic 加载数据的功能。

TransactionStateManager 提供了 appendTransactionToLog 方法用于持久化。

TransactionStateManager 提供了 loadTransactionsForTxnTopicPartition 方法用于从消息 topic 恢复数据,这里不再详细介绍。

接下来来讲讲 TransactionCoordinator 类,它负责处理重要的事务请求。

handleInitProducerId 方法会返回 producer id,如果这个事务的 transaction id 第一次请求,那么会为它分配新的 producer id 。如果之前请求过,就会返回之前分配的 producer id。

handleAddPartitionsToTransaction 方法会将上传的分区列表,添加到元数据并且持久化。

handleEndTransaction 方法会稍微复杂一些,因为它需要将这个消息转发给各个分区。

上面发送请求到分区,调用了 TransactionMarkerChannelManager 的方法。它会生成每个分区的请求,然后放到一个队列里,后台线程会负责将这些请求发送出去。当收到所有分区的响应后,它还负责更改事务的状态,并且负责持久化一条事务成功的消息。这里需要提下延迟任务 DelayedTxnMarker,它负责检查是否收到所有分区的响应。它设置的延迟时间达到365天,所以可以认为次任务几乎不会过期。

DelayedTxnMarker 是在 TransactionMarkerChannelManager 的 addTxnMarkersToSend 方法中实例化的,它的 completionCallback 参数,就是定义在 addTxnMarkersToSend 方法里面。

·END·

关于21CTO

是中国互联网第一技术社交与学习平台。为CTO、技术总监,技术专家,架构师、技术经理,高级研发工程师、PM等提供学习成长,教育培训,工作机会、人脉影响力等产品,同时为企业提供人才招聘,企业培训,软件研发等服务。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。