本申请实施例涉及计算机技术领域,具体涉及数据洗牌方法和装置。
背景技术:
通常,spark社区的continuousprocessing(连续处理)模式只支持仅例如map(映射)、filter(过滤)等map-only操作的作业,不支持例如groupby(分组)、repartition(再分区)、window(窗口函数)等shuffle(数据洗牌)操作的作业,这严重限制了用户的使用场景。因此,在continuousprocessing模式中支持shuffle操作是完善continuousprocessing的必经之路。
目前,已经实现过一个基于spark原生shuffle系统的continuousprocessingshuffle操作。但spark原生shuffle的数据在map端和reduce(归约)端进行传输时会写本地文件系统,这部分开销影响了数据的传输速率,从而增加了数据的传输延迟。
技术实现要素:
本申请实施例提出了数据洗牌方法和装置。
第一方面,本申请实施例提出了一种数据洗牌方法,包括:获取多级阶段;从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。
在一些实施例中,从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据,包括:对于多级阶段中的相邻两级阶段,从相邻两级阶段中的映射端的基于远程过程调用数据洗牌的持续写模块向相邻两级阶段中的归约端的基于远程过程调用数据洗牌的持续读模块写入数据,其中,映射端是相邻两级阶段中的上级阶段,归约端是相邻两级阶段中的下级阶段。
在一些实施例中,多级阶段包括第一级阶段、中间级阶段和最后一级阶段;以及对于多级阶段中的相邻两级阶段,从相邻两级阶段中的映射端的基于远程过程调用数据洗牌的持续写模块向相邻两级阶段中的归约端的基于远程过程调用数据洗牌的持续读模块写入数据,包括:从第一级阶段的基于远程过程调用数据洗牌的持续写模块向中间级阶段的基于远程过程调用数据洗牌的持续读模块写入数据;通过中间级阶段的基于远程过程调用数据洗牌的持续写模块从中间级阶段的基于远程过程调用数据洗牌的持续读模块读取数据,以及向最后一级阶段的基于远程过程调用数据洗牌的持续读模块写入数据。
在一些实施例中,多级阶段是有向无环图调度器通过如下步骤提交到集群中的:按照多级阶段的顺序依次提交各级阶段。
在一些实施例中,按照多级阶段的顺序依次提交各级阶段,包括:通过在有向无环图调度器中增加用于一次提交所有阶段的方法提交多级阶段。
在一些实施例中,从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据,包括:通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将终端地址注册到驱动器的处理连续模型数据洗牌操作的协调器中;通过多级阶段中的基于远程过程调用数据洗牌的持续写模块从处理连续模型数据洗牌操作的协调器中获取终端地址,以及基于终端地址将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在一些实施例中,通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将终端地址注册到驱动器的处理连续模型数据洗牌操作的协调器中,包括:在驱动器中增加处理连续模型数据洗牌操作的协调器;在执行器中创建多级阶段中的基于远程过程调用数据洗牌的持续读模块;通过远程过程调用请求将终端地址注册到处理连续模型数据洗牌操作的协调器中。
在一些实施例中,在执行器中创建多级阶段中的基于远程过程调用数据洗牌的持续读模块,包括:基于终端地址的名称和分区标识从处理连续模型数据洗牌操作的协调器中获取终端地址的引用实例,其中,处理连续模型数据洗牌操作的协调器中存储终端地址名称映射和终端地址分区映射;创建封装连续远程过程调用数据洗牌逻辑的弹性分布式数据集;通过弹性分布式数据集对终端地址对应的分区创建基于远程过程调用数据洗牌的持续读模块。
在一些实施例中,在通过多级阶段中的基于远程过程调用数据洗牌的持续写模块从处理连续模型数据洗牌操作的协调器中获取终端地址之前,还包括:创建连续模型远程过程调用数据洗牌中映射阶段的任务;在连续模型远程过程调用数据洗牌中映射阶段的任务中创建多级阶段中的基于远程过程调用数据洗牌的持续写模块;以及基于终端地址将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中,包括:通过远程过程调用请求将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在一些实施例中,在连续模型远程过程调用数据洗牌中映射阶段的任务中创建多级阶段中的基于远程过程调用数据洗牌的持续写模块之后,还包括:取消多级阶段中的基于远程过程调用数据洗牌的持续写模块对下一级分区个数的限制。
在一些实施例中,该方法还包括:在连续处理模式下,通过水印协调器汇报和更新数据的水印,以及处理数据中的过期数据。
在一些实施例中,通过水印协调器汇报和更新数据的水印,包括:创建水印协调器;在驱动器中统计从执行器汇报的最大事件时间值,以及计算全局水印值;对用于处理基于事件时间的水印的代码逻辑进行修改,以使处于连续处理模式下;汇报和计算水印,以及更新计算时依赖的水印值。
第二方面,本申请实施例提出了一种数据洗牌装置,包括:获取单元,被配置成获取多级阶段;写入单元,被配置成从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;对齐单元,被配置成通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。
在一些实施例中,写入单元包括:写入子单元,被配置成对于多级阶段中的相邻两级阶段,从相邻两级阶段中的映射端的基于远程过程调用数据洗牌的持续写模块向相邻两级阶段中的归约端的基于远程过程调用数据洗牌的持续读模块写入数据,其中,映射端是相邻两级阶段中的上级阶段,归约端是相邻两级阶段中的下级阶段。
在一些实施例中,多级阶段包括第一级阶段、中间级阶段和最后一级阶段;以及写入子单元包括:第一写入模块,被配置成从第一级阶段的基于远程过程调用数据洗牌的持续写模块向中间级阶段的基于远程过程调用数据洗牌的持续读模块写入数据;第二写入模块,被配置成通过中间级阶段的基于远程过程调用数据洗牌的持续写模块从中间级阶段的基于远程过程调用数据洗牌的持续读模块读取数据,以及向最后一级阶段的基于远程过程调用数据洗牌的持续读模块写入数据。
在一些实施例中,多级阶段是有向无环图调度器通过如下步骤提交到集群中的:按照多级阶段的顺序依次提交各级阶段。
在一些实施例中,按照多级阶段的顺序依次提交各级阶段,包括:通过在有向无环图调度器中增加用于一次提交所有阶段的方法提交多级阶段。
在一些实施例中,写入单元包括:注册子单元,被配置成通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将终端地址注册到驱动器的处理连续模型数据洗牌操作的协调器中;获取及写入子单元,被配置成通过多级阶段中的基于远程过程调用数据洗牌的持续写模块从处理连续模型数据洗牌操作的协调器中获取终端地址,以及基于终端地址将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在一些实施例中,注册子单元包括:增加模块,被配置成在驱动器中增加处理连续模型数据洗牌操作的协调器;创建模块,被配置成在执行器中创建多级阶段中的基于远程过程调用数据洗牌的持续读模块;注册模块,被配置成通过远程过程调用请求将终端地址注册到处理连续模型数据洗牌操作的协调器中。
在一些实施例中,创建模块进一步被配置成:基于终端地址的名称和分区标识从处理连续模型数据洗牌操作的协调器中获取终端地址的引用实例,其中,处理连续模型数据洗牌操作的协调器中存储终端地址名称映射和终端地址分区映射;创建封装连续远程过程调用数据洗牌逻辑的弹性分布式数据集;通过弹性分布式数据集对终端地址对应的分区创建基于远程过程调用数据洗牌的持续读模块。
在一些实施例中,写入单元还包括:第一创建子单元,被配置成创建连续模型远程过程调用数据洗牌中映射阶段的任务;第二创建子单元,被配置成在连续模型远程过程调用数据洗牌中映射阶段的任务中创建多级阶段中的基于远程过程调用数据洗牌的持续写模块;以及获取及写入子单元进一步被配置成:通过远程过程调用请求将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在一些实施例中,写入单元还包括:取消子单元,被配置成取消多级阶段中的基于远程过程调用数据洗牌的持续写模块对下一级分区个数的限制。
在一些实施例中,该装置还包括:更新及汇报单元,被配置成在连续处理模式下,通过水印协调器汇报和更新数据的水印,以及处理数据中的过期数据。
在一些实施例中,更新及汇报单元进一步被配置成:创建水印协调器;在驱动器中统计从执行器汇报的最大事件时间值,以及计算全局水印值;对用于处理基于事件时间的水印的代码逻辑进行修改,以使处于连续处理模式下;汇报和计算水印,以及更新计算时依赖的水印值。
第三方面,本申请实施例提供了一种电子设备,该电子设备包括:一个或多个处理器;存储装置,其上存储有一个或多个程序;当一个或多个程序被一个或多个处理器执行,使得一个或多个处理器实现如第一方面中任一实现方式描述的方法。
第四方面,本申请实施例提供了一种计算机可读介质,其上存储有计算机程序,该计算机程序被处理器执行时实现如第一方面中任一实现方式描述的方法。
本申请实施例提供的数据洗牌方法和装置,首先获取多级阶段;然后从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;最后通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。通过远程过程调用实现了在连续处理模式下的数据洗牌操作,提高了数据洗牌阶段数据的传输速率,降低了数据的传输时延。
附图说明
通过阅读参照以下附图所作的对非限制性实施例所作的详细描述,本申请的其它特征、目的和优点将会变得更明显:
图1是本申请可以应用于其中的示例性系统架构;
图2是根据本申请的数据洗牌方法的一个实施例的流程图;
图3是根据本申请的数据洗牌方法的又一个实施例的流程图;
图4是两级阶段的结构示意图;
图5是根据本申请的数据洗牌装置的一个实施例的结构示意图;
图6是适于用来实现本申请实施例的电子设备的计算机系统的结构示意图。
具体实施方式
下面结合附图和实施例对本申请作进一步的详细说明。可以理解的是,此处所描述的具体实施例仅仅用于解释相关发明,而非对该发明的限定。另外还需要说明的是,为了便于描述,附图中仅示出了与有关发明相关的部分。
需要说明的是,在不冲突的情况下,本申请中的实施例及实施例中的特征可以相互组合。下面将参考附图并结合实施例来详细说明本申请。
图1示出了可以应用本申请的数据洗牌方法或数据洗牌装置的实施例的示例性系统架构100。
如图1所示,系统架构100中可以包括有向无环图调度器101和集群102。其中,集群102中包括机器1021、1022、1023、1024。有向无环图调度器101可以将多级阶段统一提交到集群102中。集群102可以在多级阶段中进行数据写入。
应该理解,图1中的向无环图调度器和集群,以及集群中包括的机器的数目仅仅是示意性的。根据实现需要,可以具有任意数目的向无环图调度器和集群,以及集群中包括的机器。
继续参考图2,其示出了根据本申请的数据洗牌方法的一个实施例的流程200。该数据洗牌方法包括以下步骤:
步骤201,获取多级阶段。
在本实施例中,集群可以获取多级stage(阶段)。通常,dagscheduler(有向无环图调度器)可以将多级stage提交到集群中。其中,多级stage可以包括至少两个stage。每个stage会映射成不同的任务,多级stage提交就是这些stage对应的任务全部提交到集群中。并且,多级stage可能被提交到集群中的同一台机器中,也可能被提交到集群中的不同机器中,同一机器上可以包括多级stage中的一个或多个stage。
在本实施例的一些可选的实现方式中,dagscheduler可以按照多级stage的顺序依次提交各级stage。通常,当上级stage提交并执行结束后,其下级stage才会被提交。而在continuousprocessing模式下,每级stage都会一直不停的运行,不再会有stage结束的过程。因此,需要在作业启动之初就将所有stage全部提交上去。
在本实施例的一些可选的实现方式中,通过在dagscheduler中增加一个新方法submitallstagesonce(finalstage:resultstage)可以一次提交多级stage。其中,submitallstagesonce(finalstage:resultstage)是用于一次提交所有阶段的方法,其伪代码如下:
privatedefsubmitallstagesonce(finalstage:resultstage){
valstages=getallancestorstages(finalstage)//获取所有stage
valjobid=activejobforstage(finalstage)//生成一个jobid
loginfo(s"startsubmitting${stages.size}stagesforcontinuousprocessing,jobid:${jobid}")//打印日志信息
for(stage<-stages){
loginfo("submitting" stage "(" stage.rdd ")")
submitmissingtasks(stage,jobid.get)//通过submitmissingtasks接口依次将每级stage进行提交
}
}
步骤202,从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据。
在本实施例中,从多级stage中的rpccontinuousshufflewriter(基于远程过程调用数据洗牌的持续写模块)可以向多级stage中的rpccontinuousshufflereader(基于远程过程调用数据洗牌的持续读模块)写入数据。通常,按照多级stage的顺序,上级stage中的rpccontinuousshufflewriter可以将数据写入其下级stage中的rpccontinuousshufflereader中。
在本实施例的一些可选的实现方式中,shuffle可以是多级stage之间的数据传输的过程。通常,对于多级stage中的相邻两级stage,从相邻两级stage中的map端的rpccontinuousshufflewriter可以向相邻两级stage中的reduce端的rpccontinuousshufflereader写入数据。其中,多级stage中的相邻两级stage之间可以通过rpccontinuousshufflewriter和rpccontinuousshufflereader写入数据。相邻两级stage之间相互依赖,其中的上级stage又叫做上游stage或父stage,其中的下级stage又叫做下游stage或子stage。包括rpccontinuousshufflewriter的stage是其下级stage对应的map端,包括rpccontinuousshufflereader的stage是其上级stage对应的reduce端。
在本实施例的一些可选的实现方式中,多级stage可以包括第一级stage、中间级stage和最后一级stage。从第一级stage的rpccontinuousshufflewriter可以向中间级stage的rpccontinuousshufflereader写入数据;通过中间级stage的rpccontinuousshufflewriter可以从中间级stage的rpccontinuousshufflereader读取数据,以及向最后一级stage的rpccontinuousshufflereader写入数据。通常,多级stage中的第一级stage仅包括rpccontinuousshufflewriter,是其下级stage对应的map端。多级stage中的最后一级stage仅包括rpccontinuousshufflereader,是其上级stage对应的reduce端。多级stage中的中间级stage同时包括rpccontinuousshufflewriter和rpccontinuousshufflereader。中间级stage是其上级stage对应的reduce端,通过rpccontinuousshufflereader从上级stage读取数据。同时,中间级stage是其下级stage对应的map端,通过rpccontinuousshufflewriter向其下级stage写入数据。
在本实施例的一些可选的实现方式中,由于多级stage可能被提交到集群中的不同机器中。因此,每级stage中的rpccontinuousshufflewriter需要获取其下级stage中的rpccontinuousshufflereader的endpoint(终端地址),才能向其下级stage中的rpccontinuousshufflereader写入数据。将rpccontinuousshufflereader的endpoint注册到driver(驱动器)中,可以方便rpccontinuousshufflewriter获取相应的rpccontinuousshufflereader的endpoint。具体地,通过多级stage中的rpccontinuousshufflereader可以将endpoint注册到driver的continuousshufflecoordinator(处理连续模型数据洗牌操作的协调器)中;通过多级stage中的rpccontinuousshufflewriter可以从continuousshufflecoordinator中获取endpoint,以及基于endpoint将数据写入多级stage中的rpccontinuousshufflereader的缓存中。其中,rpccontinuousshufflereader实际是一个threadsaferpcendpoint(线程安全的远程过程调用终端地址),且包含一个缓存数据的缓存。
步骤203,通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。
在本实施例中,通过多级stage中的rpccontinuousshufflereader可以将数据对齐。通常,为了保证数据的正确性,在每级stage中需要保证epoch的对齐。其中,一个epoch对应一批数据。在流式计算中,数据源源不断地流入系统,系统将这些永无止尽地数据流切分成一个个epoch,一个epoch是整个数据流中的一段数据的集合。
在本实施例的一些可选的实现方式中,在连续处理模式下,通过watermarkcoordinator(水印协调器)可以汇报和更新数据的watermark(水印),以及处理数据中的过期数据。具体地,汇报和更新数据的watermark的步骤如下:
首先,创建watermarkcoordinator。
之后,在driver中统计从executor(执行器)汇报的maxeventtime(最大事件时间值),以及计算全局watermark值。
然后,对eventtimewatermarkexec(用于处理基于事件时间的水印的代码逻辑)进行修改,以使处于continuousprocessing模式下。
通常,watermarkcoordinator在汇报watermark时,需要对eventtimewatermarkexec进行修改,以使处于continuousprocessing模式下。
最后,汇报和计算watermark,以及更新计算时依赖的watermark值。
其中,statestoresaveexec(用于处理状态存储的代码逻辑)是具体使用watermark的类,其通过child.execute().mappartitionswithstatestore(对每个partition中的数据单独进行处理且可以操作状态数据的函数),实际生成了一个statestorerdd(可以操作状态数据的rdd)。该rdd接收一个函数(statestore,iterator[t])=>iterator[u]作为参数,每个epoch被计算时会调用该函数,函数中会使用watermark对数据进行处理。因此,需要在该函数的开始,从watermarkcoordinator获取最新的watermark值并应用于计算。
本申请实施例提供的数据洗牌方法和装置,首先获取多级阶段;然后从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;最后通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。通过远程过程调用实现了在连续处理模式下的数据洗牌操作,提高了数据洗牌阶段数据的传输速率,降低了数据的传输时延。
进一步参考图3,其示出了根据本申请的数据洗牌方法的又一个实施例的流程300。该数据洗牌方法包括以下步骤:
步骤301,获取多级阶段。
在本实施例中,步骤301的具体操作已在图2所示的实施例中步骤201中进行了详细的介绍,在此不再赘述。
步骤302,在驱动器中增加处理连续模型数据洗牌操作的协调器。
在本实施例中,在driver中可以增加一个新类continuousshufflecoordinator。其中,continuousshufflecoordinator实现的部分伪代码如下:
private[continuous]classcontinuousshufflecoordinatorextends
threadsaferpcendpointwithlogging{//继承threadsaferpcendpoint类
//记录shuffleid到[endpointname,endpointref])的映射
//其中映射的值又是endpointname到endpointref的一个映射valendpointnamemap:
concurrenthashmap[integer,map[string,rpcendpointref]].asscala
//记录shuffleid到[partitionid,endpointref])的映射
//其中映射的值又是partitionid到endpointref的一个映射valendpointpartitionmap:
concurenthashmap[integer,sortedmap[int,rpcendpointref]].asscala
//异步rpc,不做任何事
overridedefreceive:partialfunction[any,unit]={
//不做任何事
}
//同步rpc,接收rpc请求,并同步返回结果
overridedefreceiveandreply(context:rpccallcontext):partialfunction[any,unit]={
//接收到rpc请求消息类型是addendpoint消息
caseaddendpoint(sshuffleid,name,ref,partitionid)=>
//对endpointnamemap进行更新
valnamemap=endpointnamemap.getorelseupdate(shuffleid,
newhashmap[string,rpcendpointref]())
namemap.add(name,ref)
//对endpointpartitionmap进行更新
valpartitionmap=endpointnamemap.getorelseupdate(shuffleid,
newsortedmap[int,rpcendpointref]())
partitionmap.add(partitionid,ref)
//接收到rpc请求消息类型是getendpoint消息
casegetendpoint(shuffleid,name)=>
//根据shuffleid和endpointname,从endpointname中获取相应的endpointref并返回
valref=endpointname(shuffleid).get(name)
context.reply(ref)
//接收到的rpc请求消息类型是getallendpoints消息
casegetallendpoints(shuffleid)=>
//根据shuffleid获取所有分区对应的endpoint的映射
valref=endpointpartitionmap(shuffleid).values.toarray()
}
步骤303,在执行器中创建多级阶段中的基于远程过程调用数据洗牌的持续读模块。
在本实施例中,在executor中可以创建多级stage中的rpccontinuousshufflereader。
在本实施例的一些可选的实现方式中,首先基于endpointname(终端地址的名称)和partitionid(分区标识)从continuousshufflecoordinator中获取endpoint(rpcendpointref)(终端地址的一个引用实例);随后创建一个continuousrpcshuffledrdd(封装连续远程过程调用数据洗牌逻辑的弹性分布式数据集);通过continuousrpcshuffledrdd可以对endpoint对应的paritition(分区)创建rpccontinuousshufflereader。其中,continuousshufflecoordinator中可以存储endpointnamemap(终端地址名称映射)和endpointpartitionmap(终端地址分区映射)。
步骤304,通过远程过程调用请求将终端地址注册到处理连续模型数据洗牌操作的协调器中。
在本实施例中,通过rpc(远程过程调用)请求可以将endpoint注册到continuousshufflecoordinator。其中,伪代码如下:
步骤305,创建连续模型远程过程调用数据洗牌中映射阶段的任务。
在本实施例中,创建一个新类continuousrpcshufflemaptask(连续模型远程过程调用数据洗牌中映射阶段的任务)。
步骤306,在连续模型远程过程调用数据洗牌中映射阶段的任务中创建多级阶段中的基于远程过程调用数据洗牌的持续写模块。
在本实施例中,在continuousrpcshufflemaptask中可以创建rpccontinuousshufflewriter。
目前,rpccontinuousshufflewriter的outputpartitioner.numpartition(下级分区个数)通常只能等于1。在本实施例的一些可选的实现方式中,通过对rpccontinuousshufflewriter进行修改,可以取消多级stage中的rpccontinuousshufflewriter对outputpartitioner.numpartition的限制。这样,一个rpccontinuousshufflewriter就可以向多个rpccontinuousshufflereader写入数据。
步骤307,从处理连续模型数据洗牌操作的协调器中获取终端地址。
在本实施例中,多级stage中的rpccontinuousshufflewriter可以从continuousshufflecoordinator中获取endpoint。
需要说明的是,rpccontinuousshufflereader向continuousshufflecoordinator注册endpoint与rpccontinuousshufflewriter从continuousshufflecoordinator获取endpoint可能同时发生。为了确保rpccontinuousshufflewriter能够获取其所有下游rpccontinuousshufflereader的endpoint,rpccontinuousshufflewriter需要一直轮询直至获取到其所有下游rpccontinuousshufflereader的endpoint为止,其伪代码如下:
其中,getallendpoints需要轮询直到所有需要的rpccontinuousshufflereader的endpoint都被注册到continuosshufflecoordinator中。
步骤308,通过远程过程调用请求将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在本实施例中,通过rpc请求可以将数据写入多级stage中的rpccontinuousshufflereader的缓存中。
步骤309,通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。
在本实施例中,通过多级stage中的rpccontinuousshufflereader可以将数据对齐。通常,为了保证数据的正确性,在每级stage中需要保证epoch的对齐。其中,一个epoch对应一批数据。在流式计算中,数据源源不断地流入系统,系统将这些永无止尽地数据流切分成一个个epoch,一个epoch是整个数据流中的一段数据的集合。
通常,下游的每个partition对应一个rpccontinuousshufflereader,每个partition的计算,需要等待上游所有partition的epochmarker(epoch标识)均到达后才能开始计算。其中,epochmarker是一种数据类型,用于标识一个epoch的结束。
需要说明的是,当前spark社区中rpccontinuousshufflereader的实现已经确保了每次返回的一个iterator(迭代器)对应的是一个epoch范围内的数据。当一个epoch的数据读取完毕后,会返回一个null,然后继续获取下一个epoch对应的iterator。而当前rpccontinuousshufflereader的实现已经可以满足epoch对齐,不需要额外改动。
为了便于理解,图4其示出了两级stage的结构示意图。如图4所示,两级stage包括stage-1和stage-2。其中,stage-1是stage-2的映射端,包括n个paritition,即paritition1到parititionn。每个paritition对应一个rpccontinuousshufflewriter。stage-2是stage-1的归约端,包括n个rpccontinuousshufflereader。rpccontinuousshufflereader将其endpoint注册到continuousshufflecoordinator中。rpccontinuousshufflewriter从continuousshufflecoordinator中获取endpoint,并通过rpc请求将数据写入rpccontinuousshufflereader中。
应当理解的是,图4仅仅示出了两级stage的结构示意图,对于大于两级的多级stage,在其中间扩展同时包括rpccontinuousshufflewriter和rpccontinuousshufflereader的stage即可。
从图3中可以看出,与图2对应的实施例相比,本实施例中的数据洗牌方法的流程300突出了数据写入步骤。由此,本实施例描述的方案将rpccontinuousshufflereader的endpoint注册到driver中,可以方便rpccontinuousshufflewriter获取相应的rpccontinuousshufflereader的endpoint,从而实现提交到不同机器上多级阶段的数据传输。
进一步参考图5,作为对上述各图所示的方法的实现,本申请提供了一种数据洗牌装置的一个实施例,该装置实施例与图2所示的方法实施例相对应,该装置具体可以应用于各种电子设备中。
如图5所示,本实施例的数据洗牌装置500可以包括:获取单元501、写入单元502和对齐单元503。其中,获取单元501,被配置成获取多级阶段;写入单元502,被配置成从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;对齐单元503,被配置成通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。
在本实施例中,数据洗牌装置500中:获取单元501、写入单元502和对齐单元503的具体处理及其所带来的技术效果可分别参考图2对应实施例中的步骤201-203的相关说明,在此不再赘述。
在本实施例的一些可选的实现方式中,写入单元502包括:写入子单元(图中未示出),被配置成对于多级阶段中的相邻两级阶段,从相邻两级阶段中的映射端的基于远程过程调用数据洗牌的持续写模块向相邻两级阶段中的归约端的基于远程过程调用数据洗牌的持续读模块写入数据,其中,映射端是相邻两级阶段中的上级阶段,归约端是相邻两级阶段中的下级阶段。
在本实施例的一些可选的实现方式中,多级阶段包括第一级阶段、中间级阶段和最后一级阶段;以及写入子单元包括:第一写入模块(图中未示出),被配置成从第一级阶段的基于远程过程调用数据洗牌的持续写模块向中间级阶段的基于远程过程调用数据洗牌的持续读模块写入数据;第二写入模块(图中未示出),被配置成通过中间级阶段的基于远程过程调用数据洗牌的持续写模块从中间级阶段的基于远程过程调用数据洗牌的持续读模块读取数据,以及向最后一级阶段的基于远程过程调用数据洗牌的持续读模块写入数据。
在本实施例的一些可选的实现方式中,多级阶段是有向无环图调度器通过如下步骤提交到集群中的:按照多级阶段的顺序依次提交各级阶段。
在本实施例的一些可选的实现方式中,按照多级阶段的顺序依次提交各级阶段,包括:通过在有向无环图调度器中增加用于一次提交所有阶段的方法提交多级阶段。
在本实施例的一些可选的实现方式中,写入单元502包括:注册子单元(图中未示出),被配置成通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将终端地址注册到驱动器的处理连续模型数据洗牌操作的协调器中;获取及写入子单元(图中未示出),被配置成通过多级阶段中的基于远程过程调用数据洗牌的持续写模块从处理连续模型数据洗牌操作的协调器中获取终端地址,以及基于终端地址将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在本实施例的一些可选的实现方式中,注册子单元包括:增加模块(图中未示出),被配置成在驱动器中增加处理连续模型数据洗牌操作的协调器;创建模块(图中未示出),被配置成在执行器中创建多级阶段中的基于远程过程调用数据洗牌的持续读模块;注册模块(图中未示出),被配置成通过远程过程调用请求将终端地址注册到处理连续模型数据洗牌操作的协调器中。
在本实施例的一些可选的实现方式中,创建模块进一步被配置成:基于终端地址的名称和分区标识从处理连续模型数据洗牌操作的协调器中获取终端地址的引用实例,其中,处理连续模型数据洗牌操作的协调器中存储终端地址名称映射和终端地址分区映射;创建封装连续远程过程调用数据洗牌逻辑的弹性分布式数据集;通过弹性分布式数据集对终端地址对应的分区创建基于远程过程调用数据洗牌的持续读模块。
在本实施例的一些可选的实现方式中,写入单元502还包括:第一创建子单元(图中未示出),被配置成创建连续模型远程过程调用数据洗牌中映射阶段的任务;第二创建子单元(图中未示出),被配置成在连续模型远程过程调用数据洗牌中映射阶段的任务中创建多级阶段中的基于远程过程调用数据洗牌的持续写模块;以及获取及写入子单元进一步被配置成:通过远程过程调用请求将数据写入多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
在本实施例的一些可选的实现方式中,写入单元502还包括:取消子单元(图中未示出),被配置成取消多级阶段中的基于远程过程调用数据洗牌的持续写模块对下一级分区个数的限制。
在本实施例的一些可选的实现方式中,数据洗牌装置500还包括:更新及汇报单元(图中未示出),被配置成在连续处理模式下,通过水印协调器汇报和更新数据的水印,以及处理数据中的过期数据。
在本实施例的一些可选的实现方式中,更新及汇报单元进一步被配置成:创建水印协调器;在驱动器中统计从执行器汇报的最大事件时间值,以及计算全局水印值;对用于处理基于事件时间的水印的代码逻辑进行修改,以使处于连续处理模式下;汇报和计算水印,以及更新计算时依赖的水印值。
下面参考图6,其示出了适于用来实现本申请实施例的电子设备的计算机系统600的结构示意图。图6示出的电子设备仅仅是一个示例,不应对本申请实施例的功能和使用范围带来任何限制。
如图6所示,计算机系统600包括中央更新及汇报单元(cpu)601,其可以根据存储在只读存储器(rom)602中的程序或者从存储部分608加载到随机访问存储器(ram)603中的程序而执行各种适当的动作和处理。在ram603中,还存储有系统600操作所需的各种程序和数据。cpu601、rom602以及ram603通过总线604彼此相连。输入/输出(i/o)接口605也连接至总线604。
以下部件连接至i/o接口605:包括键盘、鼠标等的输入部分606;包括诸如阴极射线管(crt)、液晶显示器(lcd)等以及扬声器等的输出部分607;包括硬盘等的存储部分608;以及包括诸如lan卡、调制解调器等的网络接口卡的通信部分609。通信部分609经由诸如因特网的网络执行通信处理。驱动器610也根据需要连接至i/o接口605。可拆卸介质611,诸如磁盘、光盘、磁光盘、半导体存储器等等,根据需要安装在驱动器610上,以便于从其上读出的计算机程序根据需要被安装入存储部分608。
特别地,根据本公开的实施例,上文参考流程图描述的过程可以被实现为计算机软件程序。例如,本公开的实施例包括一种计算机程序产品,其包括承载在计算机可读介质上的计算机程序,该计算机程序包含用于执行流程图所示的方法的程序代码。在这样的实施例中,该计算机程序可以通过通信部分609从网络上被下载和安装,和/或从可拆卸介质611被安装。在该计算机程序被中央更新及汇报单元(cpu)601执行时,执行本申请的方法中限定的上述功能。
需要说明的是,本申请所述的计算机可读介质可以是计算机可读信号介质或者计算机可读存储介质或者是上述两者的任意组合。计算机可读存储介质例如可以是——但不限于——电、磁、光、电磁、红外线、或半导体的系统、装置或器件,或者任意以上的组合。计算机可读存储介质的更具体的例子可以包括但不限于:具有一个或多个导线的电连接、便携式计算机磁盘、硬盘、随机访问存储器(ram)、只读存储器(rom)、可擦式可编程只读存储器(eprom或闪存)、光纤、便携式紧凑磁盘只读存储器(cd-rom)、光存储器件、磁存储器件、或者上述的任意合适的组合。在本申请中,计算机可读存储介质可以是任何包含或存储程序的有形介质,该程序可以被指令执行系统、装置或者器件使用或者与其结合使用。而在本申请中,计算机可读的信号介质可以包括在基带中或者作为载波一部分传播的数据信号,其中承载了计算机可读的程序代码。这种传播的数据信号可以采用多种形式,包括但不限于电磁信号、光信号或上述的任意合适的组合。计算机可读的信号介质还可以是计算机可读存储介质以外的任何计算机可读介质,该计算机可读介质可以发送、传播或者传输用于由指令执行系统、装置或者器件使用或者与其结合使用的程序。计算机可读介质上包含的程序代码可以用任何适当的介质传输,包括但不限于:无线、电线、光缆、rf等等,或者上述的任意合适的组合。
可以以一种或多种程序设计语言或其组合来编写用于执行本申请的操作的计算机程序代码,所述程序设计语言包括面向目标的程序设计语言—诸如java、smalltalk、c ,还包括常规的过程式程序设计语言—诸如”c”语言或类似的程序设计语言。程序代码可以完全地在用户计算机上执行、部分地在用户计算机上执行、作为一个独立的软件包执行、部分在用户计算机上部分在远程计算机上执行、或者完全在远程计算机或电子设备上执行。在涉及远程计算机的情形中,远程计算机可以通过任意种类的网络——包括局域网(lan)或广域网(wan)—连接到用户计算机,或者,可以连接到外部计算机(例如利用因特网服务提供商来通过因特网连接)。
附图中的流程图和框图,图示了按照本申请各种实施例的系统、方法和计算机程序产品的可能实现的体系架构、功能和操作。在这点上,流程图或框图中的每个方框可以代表一个模块、程序段、或代码的一部分,该模块、程序段、或代码的一部分包含一个或多个用于实现规定的逻辑功能的可执行指令。也应当注意,在有些作为替换的实现中,方框中所标注的功能也可以以不同于附图中所标注的顺序发生。例如,两个接连地表示的方框实际上可以基本并行地执行,它们有时也可以按相反的顺序执行,这依所涉及的功能而定。也要注意的是,框图和/或流程图中的每个方框、以及框图和/或流程图中的方框的组合,可以用执行规定的功能或操作的专用的基于硬件的系统来实现,或者可以用专用硬件与计算机指令的组合来实现。
描述于本申请实施例中所涉及到的单元可以通过软件的方式实现,也可以通过硬件的方式来实现。所描述的单元也可以设置在处理器中,例如,可以描述为:一种处理器包括获取单元、写入单元和对齐单元。其中,这些单元的名称在种情况下并不构成对该单元本身的限定,例如,获取单元还可以被描述为“获取多级阶段的单元”。
作为另一方面,本申请还提供了一种计算机可读介质,该计算机可读介质可以是上述实施例中描述的电子设备中所包含的;也可以是单独存在,而未装配入该电子设备中。上述计算机可读介质承载有一个或者多个程序,当上述一个或者多个程序被该电子设备执行时,使得该电子设备:获取多级阶段;从多级阶段中的基于远程过程调用数据洗牌的持续写模块向多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;通过多级阶段中的基于远程过程调用数据洗牌的持续读模块将数据对齐。
以上描述仅为本申请的较佳实施例以及对所运用技术原理的说明。本领域技术人员应当理解,本申请中所涉及的发明范围,并不限于上述技术特征的特定组合而成的技术方案,同时也应涵盖在不脱离上述发明构思的情况下,由上述技术特征或其等同特征进行任意组合而形成的其它技术方案。例如上述特征与本申请中公开的(但不限于)具有类似功能的技术特征进行互相替换而形成的技术方案。
1.一种数据洗牌方法,包括:
获取多级阶段;
从所述多级阶段中的基于远程过程调用数据洗牌的持续写模块向所述多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;
通过所述多级阶段中的基于远程过程调用数据洗牌的持续读模块将所述数据对齐。
2.根据权利要求1所述的方法,其中,所述从所述多级阶段中的基于远程过程调用数据洗牌的持续写模块向所述多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据,包括:
对于所述多级阶段中的相邻两级阶段,从所述相邻两级阶段中的映射端的基于远程过程调用数据洗牌的持续写模块向所述相邻两级阶段中的归约端的基于远程过程调用数据洗牌的持续读模块写入所述数据,其中,所述映射端是所述相邻两级阶段中的上级阶段,所述归约端是所述相邻两级阶段中的下级阶段。
3.根据权利要求2所述的方法,其中,所述多级阶段包括第一级阶段、中间级阶段和最后一级阶段;以及
所述对于所述多级阶段中的相邻两级阶段,从所述相邻两级阶段中的映射端的基于远程过程调用数据洗牌的持续写模块向所述相邻两级阶段中的归约端的基于远程过程调用数据洗牌的持续读模块写入所述数据,包括:
从所述第一级阶段的基于远程过程调用数据洗牌的持续写模块向所述中间级阶段的基于远程过程调用数据洗牌的持续读模块写入所述数据;
通过所述中间级阶段的基于远程过程调用数据洗牌的持续写模块从所述中间级阶段的基于远程过程调用数据洗牌的持续读模块读取所述数据,以及向所述最后一级阶段的基于远程过程调用数据洗牌的持续读模块写入所述数据。
4.根据权利要求1所述的方法,其中,所述多级阶段是有向无环图调度器通过如下步骤提交到集群中的:
按照所述多级阶段的顺序依次提交各级阶段。
5.根据权利要求4所述的方法,其中,所述按照所述多级阶段的顺序依次提交各级阶段,包括:
通过在所述有向无环图调度器中增加用于一次提交所有阶段的方法提交所述多级阶段。
6.根据权利要求3所述的方法,其中,所述从所述多级阶段中的基于远程过程调用数据洗牌的持续写模块向所述多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据,包括:
通过所述多级阶段中的基于远程过程调用数据洗牌的持续读模块将终端地址注册到驱动器的处理连续模型数据洗牌操作的协调器中;
通过所述多级阶段中的基于远程过程调用数据洗牌的持续写模块从所述处理连续模型数据洗牌操作的协调器中获取所述终端地址,以及基于所述终端地址将所述数据写入所述多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
7.根据权利要求6所述的方法,其中,所述通过所述多级阶段中的基于远程过程调用数据洗牌的持续读模块将终端地址注册到驱动器的处理连续模型数据洗牌操作的协调器中,包括:
在所述驱动器中增加所述处理连续模型数据洗牌操作的协调器;
在执行器中创建所述多级阶段中的基于远程过程调用数据洗牌的持续读模块;
通过远程过程调用请求将所述终端地址注册到所述处理连续模型数据洗牌操作的协调器中。
8.根据权利要求7所述的方法,其中,所述在执行器中创建所述多级阶段中的基于远程过程调用数据洗牌的持续读模块,包括:
基于所述终端地址的名称和分区标识从所述处理连续模型数据洗牌操作的协调器中获取所述终端地址的引用实例,其中,所述处理连续模型数据洗牌操作的协调器中存储终端地址名称映射和终端地址分区映射;
创建封装连续远程过程调用数据洗牌逻辑的弹性分布式数据集;
通过所述弹性分布式数据集对所述终端地址对应的分区创建基于远程过程调用数据洗牌的持续读模块。
9.根据权利要求8所述的方法,其中,在所述通过所述多级阶段中的基于远程过程调用数据洗牌的持续写模块从所述处理连续模型数据洗牌操作的协调器中获取所述终端地址之前,还包括:
创建连续模型远程过程调用数据洗牌中映射阶段的任务;
在所述连续模型远程过程调用数据洗牌中映射阶段的任务中创建所述多级阶段中的基于远程过程调用数据洗牌的持续写模块;以及
所述基于所述终端地址将所述数据写入所述多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中,包括:
通过远程过程调用请求将所述数据写入所述多级阶段中的基于远程过程调用数据洗牌的持续读模块的缓存中。
10.根据权利要求9所述的方法,其中,所述在所述连续模型远程过程调用数据洗牌中映射阶段的任务中创建所述多级阶段中的基于远程过程调用数据洗牌的持续写模块之后,还包括:
取消所述多级阶段中的基于远程过程调用数据洗牌的持续写模块对下一级分区个数的限制。
11.根据权利要求10所述的方法,其中,所述方法还包括:
在连续处理模式下,通过水印协调器汇报和更新所述数据的水印,以及处理所述数据中的过期数据。
12.根据权利要求11所述的方法,其中,所述通过水印协调器汇报和更新所述数据的水印,包括:
创建所述水印协调器;
在所述驱动器中统计从所述执行器汇报的最大事件时间值,以及计算全局水印值;
对用于处理基于事件时间的水印的代码逻辑进行修改,以使处于所述连续处理模式下;
汇报和计算所述水印,以及更新计算时依赖的水印值。
13.一种数据洗牌装置,包括:
获取单元,被配置成获取多级阶段;
写入单元,被配置成从所述多级阶段中的基于远程过程调用数据洗牌的持续写模块向所述多级阶段中的基于远程过程调用数据洗牌的持续读模块写入数据;
对齐单元,被配置成通过所述多级阶段中的基于远程过程调用数据洗牌的持续读模块将所述数据对齐。
14.一种电子设备,包括:
一个或多个处理器;
存储装置,其上存储有一个或多个程序,
当所述一个或多个程序被所述一个或多个处理器执行,使得所述一个或多个处理器实现如权利要求1-12中任一所述的方法。
15.一种计算机可读介质,其上存储有计算机程序,其中,所述计算机程序被处理器执行时实现如权利要求1-12中任一所述的方法。
技术总结