本发明涉及一种跨kafka集群的数据转换系统和方法,属于大数据技术领域。
背景技术:
kafka是最初由linkedin公司开发,是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。kafka以集群的方式运行,可以由一个或多个服务组成。kafka集群是一个分布式的、可分区的、可复制的消息系统,能够提供普通消息系统的功能。通俗的讲,kafka是一个日志集群,各种各样的服务器将自身的日志发送到集群中进行统一汇总和存储,然后其他机器从集群中拉去消息进行分析处理,如elt、数据挖掘等。
然而,在kafka集群内部进行数据的流转是没有任何问题的。但是由于kafka架构自身的问题,它本身并不支持跨节点的数据复制。所以在a消息系统的消费者和b消息系统的生产者对接数据时,由于数据格式不一致,会导致b消息系统的生产者接收不到消息。
技术实现要素:
针对上述问题,本发明的目的是提供一种跨kafka集群的数据转换系统和方法,通过对kafka集群间的数据进行自定义格式转换同步功能,使得不同集群间消息数据能够正常流转。
为实现上述目的,本发明采取以下技术方案:
本发明的第一个方面,是提供一种跨kafka集群的数据转换系统,其包括:数据转换模块,所述数据转换模块与需要进行数据交换、且数据格式不同的第一集群和第二集群相连;所述第一集群包括第一生产者端和第一消费者端,所述第一生产者端用于生产消息数据;所述第一消费者端用于对所述第一生产者生产的消息数据进行提取,并发送到所述数据转换模块;所述数据转换模块用于对接收到的消息数据进行数据格式转换,生成满足所述第二集群数据格式要求的消息数据后发送到所述第二集群;所述第二集群包括第二生产者端和第二消费者端,所述第二生产者端用于接收所述数据转换模块发送的消息数据,并在所述第二消费者端进行消息流转。
进一步的,所述数据转换模块包括分类模块、消息体识别模块、映射关系建立模块和格式转换模块;所述分类模块用于根据数据格式类型对数据进行分类,分为关键值、属性值和消息体三类;所述过滤模块用于对所述第一集群的第一消费者端提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据;所述映射关系建立模块用于根据所述第一集群和第二集群的数据格式要求,建立所述第一集群和第二集群中关键值和属性值的对应关系,并发送到所述格式转换模块;所述格式转换模块用于根据所述第一集群和第二集群中关键值和属性值的对应关系,将所述消息数据的消息体数据与所述第二级群对应的关键值和属性值进行组合,得到满足所述第二集群数据格式要求的消息数据。
进一步的,所述过滤模块包括关键值识别模块、属性值识别模块和消息体读取模块;所述关键值识别模块用于对所述第一消费者端提取的消息数据进行关键值提取,并将得到的关键值发送到所述消息体读取模块;所述属性值识别模块用于对所述对所述第一消费者端提取的消息数据进行属性值提取,并将得到的属性值发送到所述消息体读取模块;所述消息体读取模块用于根据确定的关键值和属性值从所述第一消费者端提取的消息数据中提取消息体数据。
进一步的,所述格式转换模块包括消息体数据复制模块、关键值转换模块以及属性值转换模块;所述消息体数据复制模块用于将所述过滤模块得到的消息体数据复制到所述第二集群的第二生产者端;所述关键值转换模块用于根据所述第一集群和第二集群中关键值映射关系,将所述第一集群的关键值转换为所述第二集群的关键值,并将得到的第二集群的关键值与所述消息体数据进行组合,得到初始消息数据;所述属性值转换模块用于根据所述第一集群和第二集群中属性值映射关系,将所述第一集群的属性值转换为所述第二集群的属性值,并将得到的所述第二集群的属性值与所述初始消息数据进行组合,得到符合所述第二集群数据格式要求的数据。
进一步的,所述数据转换模块还包括数据加密模块和数据解密模块,所述数据加密模块设置在所述第一集群的第一消费者端,用于对所述第一集群消费者端提取的消息数据进行加密;所述数据解密模块设置在所述第二集群的第二生产者端,用于对所述第二集群中的第二生产者端接收到的消息数据进行解密。
本发明的第二个方面,是提供一种跨kafka集群的数据转换方法,其包括以下步骤:1)搭建跨kafka集群的数据转换系统,其包括数据转换模块以及与数据转换模块相连的数据格式不同的第一集群和第二集群,所述第一集群和第二集群均包括生产者和消费者,且令第一集群的生产者生产数据,第二集群的生产者接收数据;2)启动第一集群和第二集群的kafka服务,在第一集群的生产者中生产数据;3)第一集群的消费者从第一集群的生产者中提取消息数据,并发送到数据转换模块;4)数据转换模块根据第二集群的数据格式要求,对提取出的消息数据进行格式转换,得到符合第二集群数据格式要求的数据;5)将符合第二集群格式要求的数据发送到第二集群的生产者,供第二集群kafka服务的消息流转使用。
进一步的,所述步骤4)中,数据转换模块根据第二集群的数据格式要求,对提取出的消息数据进行格式转换,得到符合第二集群数据格式要求的数据的方法,包括以下步骤:4.1)根据数据格式类型不同,将数据分为关键值、属性值和消息体三类;4.2)根据第一集群和第二集群的数据格式要求,建立第一集群和第二集群中关键值和属性值的对应关系;4.3)对第一集群的消费者提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据;4.4)根据第一集群和第二集群中关键值和属性值的对应关系,将第一集群的关键值和属性值转换为第二集群的关键值和属性值,并将转换后的关键值和属性值与消息数据的消息体数据进行组合,得到满足第二集群数据格式要求的消息数据。
进一步的,所述步骤4.3)中,对第一集群的消费者提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据的方法,包括以下步骤:4.3.1)对第一集群的消费者提取出的消息数据进行格式识别,得到所述消息数据的关键值和属性值;4.3.2)根据得到的消息数据的关键值和属性值,确定所述消息数据的消息体数据。
进一步的,所述步骤4.4)中,根据第一集群和第二集群中关键值和属性值的对应关系,将第一集群的关键值和属性值转换为第二集群的关键值和属性值,并将转换后的关键值和属性值与消息数据的消息体数据进行组合,得到满足第二集群数据格式要求的消息数据的方法,包括以下步骤:4.4.1)对第一集群的消息体数据进行遍历,将过滤模块得到的消息体数据复制到第二集群的第二生产者端;4.4.2)根据第一集群和第二集群中关键值映射关系,将第一集群的关键值转换为第二集群的关键值,并将得到的第二集群的关键值与消息体数据进行组合,得到初始消息数据;4.4.3)根据第一集群和第二集群中属性值映射关系,将第一集群的属性值转换为第二集群的属性值,并将得到的第二集群的属性值与初始消息数据进行组合,得到符合第二集群数据格式要求的数据。
本发明由于采取以上技术方案,其具有以下优点:本发明提出的跨kafka集群的数据转换系统,使得数据可以直接在kafka间流转、进行数据格式的转换,省去了数据落盘后在调用其他程序进行转换、传输的操作步骤,为业务流程节省了资源和成本的开销。因此,本发明可以广泛应用于kafka集群间的数据转换领域。
附图说明
图1是本发明跨kafka集群数据转换示意图。
具体实施方式
下面结合附图和实施例对本发明进行详细的描述。
本发明提出的一种跨kafka集群的数据转换系统,其包括数据转换模块,该数据转换模块与需要进行数据交换、且数据格式不同的第一集群和第二集群相连。其中,第一集群包括第一生产者端和第一消费者端,第一生产者端用于生产消息数据;第一消费者端用于对第一生产者生产的消息数据进行提取,并发送到数据转换模块;数据转换模块用于对接收到的消息数据进行数据格式转换,生成满足第二集群数据格式要求的消息数据后发送到第二集群;第二集群包括第二生产者端和第二消费者端,第二生产者端用于接收数据转换模块发送的消息数据,并在第二消费者端进行消息流转。
进一步的,数据转换模块包括分类模块、消息体识别模块、映射关系建立模块和格式转换模块;分类模块用于根据数据格式类型对数据进行分类,分为关键值、属性值和消息体三类;映射关系建立模块用于根据第一集群和第二集群的数据格式要求,建立第一集群和第二集群中关键值和属性值的对应关系,并发送到格式转换模块;过滤模块用于对第一集群的第一消费者端提取的消息数据进行过滤,得到消息数据的关键值、属性值和消息体数据;格式转换模块用于根据第一集群和第二集群中关键值和属性值的对应关系,将消息数据的消息体数据与第二级群对应的关键值和属性值进行组合,得到满足第二集群数据格式要求的消息数据。
进一步的,过滤模块包括关键值识别模块、属性值识别模块和消息体读取模块;关键值识别模块用于对第一消费者端提取的消息数据进行关键值提取,并将得到的关键值发送到消息体读取模块;属性值识别模块用于对第一消费者端提取的消息数据进行属性值提取,并将得到的属性值发送到消息体读取模块;消息体读取模块用于根据确定的关键值和属性值从第一消费者端提取的消息数据中提取消息体数据。
进一步的,格式转换模块包括消息体数据复制模块、关键值转换模块以及属性值转换模块;消息体数据复制模块用于将过滤模块得到的消息体数据复制到第二集群的第二生产者端;关键值转换模块用于根据第一集群和第二集群中关键值映射关系,将第一集群的关键值转换为第二集群的关键值,并将得到的第二集群的关键值与消息体数据进行组合,得到初始消息数据;属性值转换模块用于根据第一集群和第二集群中属性值映射关系,将第一集群的属性值转换为第二集群的属性值,并将得到的第二集群的属性值与初始消息数据进行组合,得到符合第二集群数据格式要求的数据。
进一步的,数据转换模块还包括数据加密模块和数据解密模块,其中,数据加密模块设置在第一集群的消费者端,用于对第一集群提取的消息数据进行加密;数据解密模块设置在第二集群的生产者中,用于对第二集群接收到的消息数据进行解密,以保证消息数据的安全性。
如图1所示,本发明提供的一种跨kafka集群的数据转换方法,包括以下步骤:
1)假设需要进行数据交换的两个集群分别为第一集群和第二集群,且第一集群的生产者用于生产数据,第二集群的生产者用于接收数据;
2)启动第一集群和第二集群的kafka服务,在第一集群的生产者中生产数据;
3)第一集群的消费者从第一集群的生产者中提取消息数据;
4)根据第二集群的数据格式要求,对提取出的消息数据进行格式转换,得到符合第二集群数据格式要求的数据;
5)将符合第二集群格式要求的数据发送到第二集群的生产者,供第二集群kafka服务的消息流转使用。
上述步骤4)中,根据第二集群的数据格式要求,对提取出的消息数据进行格式转换,得到符合第二集群数据格式要求的数据的方法,包括以下步骤:
4.1)根据数据格式类型不同,将数据分为关键值、属性值和消息体三类;
4.2)根据第一集群和第二集群的数据格式要求,建立第一集群和第二集群中关键值和属性值的对应关系;
4.3)对第一集群的消费者提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据;
4.4)根据第一集群和第二集群中关键值和属性值的对应关系,将第一集群的关键值和属性值转换为第二集群的关键值和属性值,并将转换后的关键值和属性值与消息数据的消息体数据进行组合,得到满足第二集群数据格式要求的消息数据。
上述步骤4.3)中,对第一集群的消费者提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据的方法,包括以下步骤:
4.3.1)对第一集群的消费者提取出的消息数据进行格式识别,得到所述消息数据的关键值和属性值;
4.3.2)根据得到的消息数据的关键值和属性值,确定所述消息数据的消息体数据。
上述步骤4.4)中,得到满足第二集群数据格式要求的消息数据的方法,包括以下步骤:
4.4.1)对第一集群的消息体数据进行遍历,将过滤模块得到的消息体数据复制到第二集群的第二生产者端;
4.4.2)根据第一集群和第二集群中关键值映射关系,将第一集群的关键值转换为第二集群的关键值,并将得到的第二集群的关键值与消息体数据进行组合,得到初始消息数据;
4.4.3)根据第一集群和第二集群中属性值映射关系,将第一集群的属性值转换为第二集群的属性值,并将得到的第二集群的属性值与初始消息数据进行组合,得到符合第二集群数据格式要求的数据。
下面通过具体实施例对本发明数据格式转换过程进行描述。
假设a集群消费者所提取的数据格式为:
{“before”:{“a1”:”a1_v”,”b1”:”b1_v”,”c1”:”c1_v”},”bb”:”bb_v”,”cc”:”cc_v”}
b集群生产者的数据格式为:
[{“aftercolumns”:{“a1”:”a1_v”,”b1”:”b1_v”,”c1”:”c1_v”},”bb”:”bb_v”,”cc”:”cc_v”}]
首先,根据数据的格式分为key、value两类;然后,对a集群的数据格式进行识别,找到名称为before的key,通过key找到其里面的值的集合,通过这样的筛选过滤原则,可以识别到a集群的数据,此时a集群输出的数据以键值对的形式存放在list集合里面。因为b集群生产者中aftercolumns的数据对应a集群中before的数据。所以,通过采用遍历a集群list集合的方法,把before中的数据集合填充到aftercolumns中。在a集群中消费者的数据中,除key名为before以外,还存在一些其他的属性值,这些属性名称可能与b集群的属性名称命名不一致,所以需要把a集群里消费者对应的值复制到b集群的生产者里面,以供b集群kafka的消息流转使用。当把消息体里面所有的key值都转换完毕后,再在整个消息体的最外层,套一层大括号“[]”,这样就达到了b集群中生产者对于数据格式的要求,进而使得b集群可以直接使用。
以上给出一种具体的实施方式,但本发明不局限于所描述的实施方式。本发明的基本思路在于上述方案,对本领域普通技术人员而言,根据本发明的教导,设计出各种变形的模型、公式、参数并不需要花费创造性劳动。在不脱离本发明的原理和精神的情况下对实施方式进行的变化、修改、替换和变形仍落入本发明的保护范围内。
1.一种跨kafka集群的数据转换系统,其特征在于:其包括:
数据转换模块,所述数据转换模块与需要进行数据交换、且数据格式不同的第一集群和第二集群相连;
所述第一集群包括第一生产者端和第一消费者端,所述第一生产者端用于生产消息数据;所述第一消费者端用于对所述第一生产者生产的消息数据进行提取,并发送到所述数据转换模块;
所述数据转换模块用于对接收到的消息数据进行数据格式转换,生成满足所述第二集群数据格式要求的消息数据后发送到所述第二集群;
所述第二集群包括第二生产者端和第二消费者端,所述第二生产者端用于接收所述数据转换模块发送的消息数据,并在所述第二消费者端进行消息流转。
2.如权利要求1所述的一种跨kafka集群的数据转换系统,其特征在于:所述数据转换模块包括分类模块、消息体识别模块、映射关系建立模块和格式转换模块;
所述分类模块用于根据数据格式类型对数据进行分类,分为关键值、属性值和消息体三类;
所述过滤模块用于对所述第一集群的第一消费者端提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据;
所述映射关系建立模块用于根据所述第一集群和第二集群的数据格式要求,建立所述第一集群和第二集群中关键值和属性值的对应关系,并发送到所述格式转换模块;
所述格式转换模块用于根据所述第一集群和第二集群中关键值和属性值的对应关系,将所述消息数据的消息体数据与所述第二级群对应的关键值和属性值进行组合,得到满足所述第二集群数据格式要求的消息数据。
3.如权利要求2所述的一种跨kafka集群的数据转换系统,其特征在于:所述过滤模块包括关键值识别模块、属性值识别模块和消息体读取模块;
所述关键值识别模块用于对所述第一消费者端提取的消息数据进行关键值提取,并将得到的关键值发送到所述消息体读取模块;
所述属性值识别模块用于对所述对所述第一消费者端提取的消息数据进行属性值提取,并将得到的属性值发送到所述消息体读取模块;
所述消息体读取模块用于根据确定的关键值和属性值从所述第一消费者端提取的消息数据中提取消息体数据。
4.如权利要求2所述的一种跨kafka集群的数据转换系统,其特征在于:所述格式转换模块包括消息体数据复制模块、关键值转换模块以及属性值转换模块;
所述消息体数据复制模块用于将所述过滤模块得到的消息体数据复制到所述第二集群的第二生产者端;
所述关键值转换模块用于根据所述第一集群和第二集群中关键值映射关系,将所述第一集群的关键值转换为所述第二集群的关键值,并将得到的第二集群的关键值与所述消息体数据进行组合,得到初始消息数据;
所述属性值转换模块用于根据所述第一集群和第二集群中属性值映射关系,将所述第一集群的属性值转换为所述第二集群的属性值,并将得到的所述第二集群的属性值与所述初始消息数据进行组合,得到符合所述第二集群数据格式要求的数据。
5.如权利要求2所述的一种跨kafka集群的数据转换系统,其特征在于:所述数据转换模块还包括数据加密模块和数据解密模块,所述数据加密模块设置在所述第一集群的第一消费者端,用于对所述第一集群消费者端提取的消息数据进行加密;所述数据解密模块设置在所述第二集群的第二生产者端,用于对所述第二集群中的第二生产者端接收到的消息数据进行解密。
6.一种跨kafka集群的数据转换方法,其特征在于包括以下步骤:
1)搭建跨kafka集群的数据转换系统,其包括数据转换模块以及与数据转换模块相连的数据格式不同的第一集群和第二集群,所述第一集群和第二集群均包括生产者和消费者,且令第一集群的生产者用于生产数据,第二集群的生产者用于接收数据;
2)启动第一集群和第二集群的kafka服务,在第一集群的生产者中生产数据;
3)第一集群的消费者从第一集群的生产者中提取消息数据,并发送到数据转换模块;
4)数据转换模块根据第二集群的数据格式要求,对提取出的消息数据进行格式转换,得到符合第二集群数据格式要求的数据;
5)将符合第二集群格式要求的数据发送到第二集群的生产者,供第二集群kafka服务的消息流转使用。
7.如权利要求6所述的一种跨kafka集群的数据转换方法,其特征在于:所述步骤4)中,数据转换模块根据第二集群的数据格式要求,对提取出的消息数据进行格式转换,得到符合第二集群数据格式要求的数据的方法,包括以下步骤:
4.1)根据数据格式类型不同,将数据分为关键值、属性值和消息体三类;
4.2)根据第一集群和第二集群的数据格式要求,建立第一集群和第二集群中关键值和属性值的对应关系;
4.3)对第一集群的消费者提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据;
4.4)根据第一集群和第二集群中关键值和属性值的对应关系,将第一集群的关键值和属性值转换为第二集群的关键值和属性值,并将转换后的关键值和属性值与消息数据的消息体数据进行组合,得到满足第二集群数据格式要求的消息数据。
8.如权利要求7所述的一种跨kafka集群的数据转换方法,其特征在于:所述步骤4.3)中,对第一集群的消费者提取的消息数据进行过滤,得到所述消息数据的关键值、属性值和消息体数据的方法,包括以下步骤:
4.3.1)对第一集群的消费者提取出的消息数据进行格式识别,得到所述消息数据的关键值和属性值;
4.3.2)根据得到的消息数据的关键值和属性值,确定所述消息数据的消息体数据。
9.如权利要求7所述的一种跨kafka集群的数据转换方法,其特征在于:所述步骤4.4)中,根据第一集群和第二集群中关键值和属性值的对应关系,将第一集群的关键值和属性值转换为第二集群的关键值和属性值,并将转换后的关键值和属性值与消息数据的消息体数据进行组合,得到满足第二集群数据格式要求的消息数据的方法,包括以下步骤:
4.4.1)对第一集群的消息体数据进行遍历,将过滤模块得到的消息体数据复制到第二集群的第二生产者端;
4.4.2)根据第一集群和第二集群中关键值映射关系,将第一集群的关键值转换为第二集群的关键值,并将得到的第二集群的关键值与消息体数据进行组合,得到初始消息数据;
4.4.3)根据第一集群和第二集群中属性值映射关系,将第一集群的属性值转换为第二集群的属性值,并将得到的第二集群的属性值与初始消息数据进行组合,得到符合第二集群数据格式要求的数据。
技术总结