本发明涉及互联网技术领域,尤其是一种分布式消息发送的处理系统及其处理方法。
背景技术:
随着互联网的飞速发展,业务增长迅速,用户量逐渐增多,业务功能越来越复杂,需要与用户交互的消息也越来越重要。
目前主要的消息发送方式有即时通讯软件模板消息,即时通讯软件客服消息,即时通讯软件小程序消息,短信,应用(application,app)消息,邮件等。
然而,面对庞大的消息量以及复杂的业务场景,每天都会有大量的消息数据需要进行相应发送。如果短时间内需要发送大量消息数据,会使得消息数据大量堆积在消息队列中等待处理,影响消息数据的发送效率,进而影响与用户之间的信息交互。
技术实现要素:
本发明所要解决的一个技术问题是目前现有技术中如果短时间内需要发送大量消息数据,会使得消息数据大量堆积在消息队列中等待处理,影响消息数据的发送效率,进而影响与用户之间的信息交互的技术问题。
根据本发明的一个方面,提供了一种分布式消息数据的处理系统,所述处理系统包括:至少一个容器节点,用于对待发送处理的消息数据进行分布式发送处理;
其中,所述容器节点各自能够启动至少一个线程获取所述消息数据进行分布式发送处理;所述处理系统中的容器节点个数以及单个容器节点对应启动的线程数,能够根据待发送处理的消息数据的消息量进行动态调整。
可选的,所述容器节点各自能够处理多种不同消息类型的消息数据,其中,每种消息类型的消息数据在发送时都有各自对应的业务处理方处理发送。
可选的,所述容器节点包括:锁定单元;
所述锁定单元,用于单个容器节点在调用单个线程从预定存储位置中获取目标消息数据进行发送处理时,对所述预定存储位置中的所述目标消息数据进行锁定处理,使得所述单个线程唯一处理所述目标消息数据,其中,所述预定存储位置中保存有待发送处理的消息数据,所述处理系统中的容器节点都从所述预定存储位置中获取消息数据进行发送处理。
可选的,所述容器节点还包括:检查单元;
所述检查单元,用于在对所述目标消息数据进行发送处理之前,对所述目标消息数据进行无效数据检查和/或数据安全校验;若所述目标消息数据未通过无效数据检查和/或未通过数据安全校验,则对所述目标消息数据进行异常标记,并停止对所述目标消息数据进行发送处理。
可选的,所述容器节点还包括:限制单元;
所述限制单元,用于对单个容器节点处理的消息数据进行容器内并发限制处理,使得同一业务对应的多个消息数据,按照所述多个消息数据之间的预设先后顺序进行依次发送。
可选的,所述容器节点还包括:处理单元;
所述处理单元,用于若需要通过公众号发送消息数据,则获取用户的公众号关注记录;根据所述公众号关注记录,判断用户是否关注待发送的目标公众号;若用户已关注所述目标公众号,则通过所述目标公众号发送对应的消息数据;若用户未关注所述目标公众号,则通过所述目标公众号对应相同业务的其他公众号发送消息数据,其中用户已关注所述其他公众号。
可选的,所述处理系统还包括:检测模块和处理模块;
所述检测模块,用于检测单个容器节点对应启动的线程中是否存在异常的线程;
所述处理模块,用于若所述检测模块检测出存在异常的线程,则输出告警信息并对所述异常的线程进行异常修复。
可选的,所述处理系统还包括:创建模块;所述创建模块,用于通过复制无状态的容器节点创建所述处理系统中的各个容器节点。
根据本发明的另一方面,提供了一种分布式消息数据的处理方法,该方法包括:
获取待发送处理的消息数据;
利用分布式消息数据的处理系统中的至少一个容器节点,对所述消息数据进行分布式发送处理;
其中,所述容器节点各自能够启动至少一个线程获取所述消息数据进行分布式发送处理;所述处理系统中的容器节点个数以及单个容器节点对应启动的线程数,能够根据待发送处理的消息数据的消息量进行动态调整。
可选的,所述利用所述处理系统中的至少一个容器节点,对所述消息数据进行分布式发送处理,包括:
利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,并使得所述至少一个容器节点之间处理不同的消息数据;
其中,单个容器节点在调用单个线程从所述预定存储位置中获取目标消息数据进行发送处理时,对所述预定存储位置中的所述目标消息数据进行锁定处理,使得所述单个线程唯一处理所述目标消息数据。
可选的,所述利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,包括:
对单个容器节点处理的目标消息数据进行无效数据检查和/或数据安全校验;
若所述目标消息数据未通过无效数据检查和/或未通过数据安全校验,则对所述目标消息数据进行异常标记,并停止对所述目标消息数据进行发送处理。
可选的,所述利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,还包括:
对单个容器节点处理的目标消息数据进行容器内并发限制处理,使得同一业务对应的多个消息数据,按照所述多个消息数据之间的预设先后顺序进行依次发送。
可选的,利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,还包括:
若需要通过公众号发送目标消息数据,则获取用户的公众号关注记录;
根据所述公众号关注记录,判断用户是否关注待发送的目标公众号;
若用户已关注所述目标公众号,则通过所述目标公众号发送所述目标消息数据;
若用户未关注所述目标公众号,则通过所述目标公众号对应相同业务的其他公众号发送所述目标消息数据,其中用户已关注所述其他公众号。
可选的,所述方法还包括:
检测单个容器节点对应启动的线程中是否存在异常的线程;
若检测出存在异常的线程,则输出告警信息并对所述异常的线程进行异常修复。
可选的,所述方法还包括:创建无状态的第一个容器节点;通过复制所述第一个容器节点,创建所述处理系统中的各个容器节点。
依据本发明又一个方面,提供了一种存储介质,其上存储有计算机程序,所述程序被处理器执行时实现上述分布式消息数据的处理方法。
依据本发明再一个方面,提供了一种电子设备,包括:上述分布式消息数据的处理系统。
借由上述技术方案,本发明提供的一种分布式消息发送的处理系统及其处理方法,与目前现有技术中相比,本发明可利用分布式消息发送系统中的容器节点,对待发送处理的消息数据进行分布式发送处理。并且每个容器节点各自能够启动至少一个线程获取消息数据进行分布式发送处理,相当于利用将各个容器节点中的各个启动线程作为分布式系统中的处理节点,对消息数据进行分布式发送处理,与传统的分布式处理系统相比,处理数据的节点数量更加庞大,进而数据能力能够大大增强,从而可大大提升大量待发送消息数据的处理进度,提高消息数据的发送效率,实现及时与用户之间的信息交互。做到整合消息发送的目的,保证消息发送系统的高可用性,高性能,数据一致性。并且这些容器节点可都在一台终端或少量几台终端中创建得到,节省资源占用。在处理过程中,可根据待发送处理的消息数据的消息量,对处理系统中的容器节点个数以及单个容器节点对应启动的线程数进行动态调整,在保证业务处理需求的情况下,尽可能的减少资源占用过度。
下面通过附图和实施例,对本发明的技术方案做进一步的详细描述。
附图说明
构成说明书的一部分的附图描述了本发明的实施例,并且连同描述一起用于解释本发明的原理。
参照附图,根据下面的详细描述,可以更加清楚地理解本发明,其中:
图1示出了本发明实施例提供的一种分布式消息数据的处理系统的结构示意图;
图2示出了本发明实施例提供的另一种分布式消息数据的处理系统的结构示意图;
图3示出了本发明实施例提供的一种分布式消息数据的处理方法的流程示意图;
图4示出了本发明实施例提供的另一种分布式消息数据的处理方法的流程示意图;
图5示出了本发明实施例提供的不同业务消息数据的统一处理流程实例示意图;
图6示出了本发明实施例提供的单个容器节点的应用实例示意图;
图7示出了本发明实施例提供的一种应用场景的流程实例示意图;
图8示出了本发明实施例提供的多个容器节点的应用实例示意图。
具体实施方式
现在将参照附图来详细描述本发明的各种示例性实施例。应注意到:除非另外具体说明,否则在这些实施例中阐述的部件和步骤的相对布置、数字表达式和数值不限制本发明的范围。
同时,应当明白,为了便于描述,附图中所示出的各个部分的尺寸并不是按照实际的比例关系绘制的。
以下对至少一个示例性实施例的描述实际上仅仅是说明性的,决不作为对本发明及其应用或使用的任何限制。
对于相关领域普通技术人员已知的技术、方法和设备可能不作详细讨论,但在适当情况下,所述技术、方法和设备应当被视为说明书的一部分。
应注意到:相似的标号和字母在下面的附图中表示类似项,因此,一旦某一项在一个附图中被定义,则在随后的附图中不需要对其进行进一步讨论。
本发明实施例可以应用于计算机系统/服务器,其可与众多其它通用或专用计算系统环境或配置一起操作。适于与计算机系统/服务器一起使用的众所周知的计算系统、环境和/或配置的例子包括但不限于:个人计算机系统、服务器计算机系统、瘦客户机、厚客户机、手持或膝上设备、基于微处理器的系统、机顶盒、可编程消费电子产品、网络个人电脑、小型计算机系统、大型计算机系统和包括上述任何系统的分布式云计算技术环境,等等。
计算机系统/服务器可以在由计算机系统执行的计算机系统可执行指令(诸如程序模块)的一般语境下描述。通常,程序模块可以包括例程、程序、目标程序、组件、逻辑、数据结构等等,它们执行特定的任务或者实现特定的抽象数据类型。计算机系统/服务器可以在分布式云计算环境中实施,分布式云计算环境中,任务是由通过通信网络链接的远程处理设备执行的。在分布式云计算环境中,程序模块可以位于包括存储设备的本地或远程计算系统存储介质上。
下文中将参考附图并结合实施例来详细说明本申请。需要说明的是,在不冲突的情况下,本发明中的实施例及实施例中的特征可以相互组合。
针对目前现有技术中如果短时间内需要发送大量消息数据,会使得消息数据大量堆积在消息队列中等待处理,影响消息数据的发送效率,进而影响与用户之间的信息交互的技术问题。本实施例提供了一种分布式消息数据的处理系统,如图1所示,该系统包括:至少一个容器节点11,用于对待发送处理的消息数据进行分布式发送处理;其中,这些容器节点11各自能够启动至少一个线程获取消息数据进行分布式发送处理;对于本实施例,本处理系统中的容器节点都各自绑定有消息发送处理机制(相当于绑定了消息发送处理的工作流程,即包含消息生产者、消息队列、消息消费者等一系列完整的消息发送处理的流程)。在单个容器节点调用单个线程执行消息数据发送处理时,可按照该消息发送处理流程进行相应处理。并且根据容器节点的处理能力,为这些容器节点分配最大数量的启动线程,如处理能力较高的容器节点可分配更多数量的启动线程等。
本实施例中的处理系统,相当于利用将各个容器节点中的各个启动线程作为分布式系统中的处理节点,对消息数据进行分布式发送处理,与传统的分布式处理系统架构(如包含多台独立的终端,通过这些终端来实现分布式数据处理)相比,处理数据的节点数量更加庞大,进而数据能力能够大大增强,从而可大大提升大量待发送消息数据的处理进度,可提高消息数据的发送效率,实现及时与用户之间的信息交互。做到整合消息发送的目的,保证消息发送系统的高可用性,高性能,数据一致性。并且这些容器节点可都在一台终端或少量几台终端中创建得到,在保证消息数据处理进度的情况下,节省硬件系统资源。
本处理系统中的容器节点11的个数以及单个容器节点11对应启动的线程数,能够根据待发送处理的消息数据的消息量进行动态调整。具体的,消息量越大,可创建更多数量的容器节点和/或增加容器节点中启动的线程数,其相应所占的内存资源也会分配更多。例如,在待处理的消息量增加时,可首先提高单个容器节点的线程数,如果各个容器节点启动的线程数已经临近到达或已经到达其对应的最大线程数时,可进行创建新的容器节点,以满足这些量级的消息数据的处理进度要求。
而在消息量变少时,可相应进行缩容,即清除过多的容器节点和/或减少容器节点中启动的线程数,其相应所占的内存资源也会分配减少。例如,在待处理的消息量减少时,可减少某一个或某几个容器节点的线程数,如果这些容器节点启动的线程数已经临近为0或已经为0,且仍然能够满足当前消息数据处理进度需求时,可删除这些线程数邻近为0或已经为0的容器节点,进而在保证消息数据处理进度的情况下,节省了硬件系统资源。
通过应用本实施例提供的分布式消息发送的处理系统,与目前现有技术相比,本实施例可利用分布式消息发送系统中的容器节点,对待发送处理的消息数据进行分布式发送处理。并且每个容器节点各自能够启动至少一个线程获取消息数据进行分布式发送处理,相当于利用将各个容器节点中的各个启动线程作为分布式系统中的处理节点,对消息数据进行分布式发送处理,与传统的分布式处理系统相比,处理数据的节点数量更加庞大,进而数据能力能够大大增强,从而可大大提升大量待发送消息数据的处理进度,提高消息数据的发送效率,实现及时与用户之间的信息交互。做到整合消息发送的目的,保证消息发送系统的高可用性,高性能,数据一致性。并且这些容器节点可都在一台终端或少量几台终端中创建得到,节省资源占用。在处理过程中,可根据待发送处理的消息数据的消息量,对处理系统中的容器节点个数以及单个容器节点对应启动的线程数进行动态调整,在保证业务处理需求的情况下,尽可能的减少资源占用过度。
除此之外,针对目前的多种消息类型(如短信、app消息、邮件等)的消息业务,各个业务自己都需要对接业务处理方(即具体承接消息发送的第三方服务平台,例如,对于短信消息发送,需要对接短信业务运营商平台;对于app消息发送,需要对接该app运营方平台等),使得开发成本高,维护性差。而采用本实施例中的处理系统,每个容器节点11各自能够处理多种不同消息类型的消息数据,其中,每种消息类型的消息数据在发送时都有各自对应的业务处理方处理发送。进而可由本消息发送系统统一对接各个第三方消息服务,可减少相应的业务开发成本,便于维护管理。
进一步的,作为上述实施例系统的细化和扩展,本处理系统可对接预定存储位置(如数据库、表等),该预定存储位置中保存有待发送处理的消息数据(如各个业务方将自己需要投放的消息数据先上传到该预定存储位置中),后续由本处理系统进行发送处理。本处理系统中的容器节点都可从该预定存储位置中获取待处理的消息数据进行发送处理。在此情况下,为了保证不同容器节点之间以及单个容器节点启动的不同线程之间,从该预定存储位置获取处理的消息数据不同,即不处理预定存储位置中同一个消息数据。可选的,如图2所示,容器节点11可包括:锁定单元1101;锁定单元1101可用于单个容器节点在调用单个线程从预定存储位置中获取目标消息数据进行发送处理时,对预定存储位置中的目标消息数据进行锁定处理,使得该线程可唯一处理目标消息数据。即除了该线程处理目标消息数据以外,其他线程以及其他容器节点都不可以处理该目标消息数据。通过这种方式,可解决消息重复发送的问题。
为了保证发送的消息数据尽可能的都是有效且正常的数据,避免发送异常数据,减少浪费系统资源。可选的,如图2所示,容器节点11还可包括:检查单元1102;检查单元1102可用于在对目标消息数据进行发送处理之前,对目标消息数据进行无效数据检查和/或数据安全校验;若目标消息数据未通过无效数据检查和/或未通过数据安全校验,则对目标消息数据进行异常标记,并停止对目标消息数据进行发送处理。例如,对目标消息数据进行异常数据检查,如果发现目标消息数据为异常数据,可进行异常标记,具体可标记该消息数据为何会被确定为异常,并对该目标消息数据进行拦截,停止对其进行发送处理。后续可根据该异常标记反馈给对应投放该消息数据的业务方,有关该消息数据发送失败的原因信息。通过这种方式,对消息数据在发送之前进行异常数据检查,可准确保证发送的消息数据都是有效的且安全的消息数据。
有时同一业务可能对应有多个消息数据,这些消息数据不能同时发送给用户,需要按照一定顺序发送,才能保证该业务的需求,如某业务的短信消息,短信消息内容过长,需要分多个消息发送,那么这些消息需要按照内容顺序发送,不能一次性的发送给用户,也不能打乱顺序发送,会影响用户的阅读行。因此为了满足这一需求,可选的,如图2所示,容器节点11还可包括:限制单元1103;限制单元1103可用于对单个容器节点处理的消息数据进行容器内并发限制处理,使得同一业务对应的多个消息数据,按照该多个消息数据之间的预设先后顺序(可由业务方进行预先设定,如按照内容、时间、相关性等因素排序)进行依次发送。保证这些消息数据可在不同的时间点并且按照先后顺序发送给用户,方便用户对这些消息进行阅读。
在具体的应用场景中,同一个业务可能会关联多个公众号,为了保证公众号类消息向用户发送的成功率,可选的,如图2所示,容器节点11还可包括:处理单元1104;处理单元1104可用于若需要通过公众号发送消息数据,则获取用户的公众号关注记录;再根据公众号关注记录,判断用户是否关注待发送的目标公众号;若用户已关注所述目标公众号,则通过目标公众号发送对应的消息数据;若用户未关注目标公众号,则通过目标公众号对应相同业务的其他公众号发送消息数据,其中用户已关注其他公众号。通过这种方式,可保证公众号类消息向用户发送的成功率,做到尽可能的将此类消息数据发送给用户。
进一步的,本实施例对于灾备与监控方面,可在每个容器节点中加入了对线程池和队列的监控,如果节点中出现生产者或消费者因为不可预见性问题,及时报警,并尝试自动化监控,以便实时解决异常情况,提高本处理系统的处理性能。相应的,如图2所示,本处理系统还可包括:检测模块12和处理模块13;检测模块12可用于检测单个容器节点对应启动的线程中是否存在异常的线程;处理模块13可用于若检测模块12检测出存在异常的线程,则输出告警信息并对异常的线程进行异常修复。例如,根据异常的线程分析异常原因进行及时报警,后续可尝试重新启动该异常的线程,判断其是否恢复正常来进行相应修复。
进一步的,为了方便创建本处理系统中的容器节点11,在本实施例中每个容器节点11都可设计成无状态(其中不包含待处理的消息数据,可只包含消息发送处理的工作流程机制)、可复制的形式。相应的,如图2所示,本系统还可包括:创建模块14;创建模块14可用于通过复制无状态的容器节点创建本处理系统中的各个容器节点。通过这种方式,可保证本处理系统的高可用与高性能,可提高容器节点的创建效率。
进一步的,为了说明本处理系统在应用中的具体处理方法,提供一种分布式消息数据的处理方法,可应用于本处理系统,如图3所示,该方法包括:
201、获取待发送处理的消息数据。
对于本实施例方法的执行主体可为消息数据处理的装置或设备,可作为本处理系统对应的中心控制节点或者独立于本处理系统的控制模块,具体用于控制本处理系统进行相应处理。
202、利用分布式消息数据的处理系统中的至少一个容器节点,对获取到的消息数据进行分布式发送处理。
其中,容器节点各自能够启动至少一个线程获取消息数据进行分布式发送处理;本处理系统中的容器节点个数以及单个容器节点对应启动的线程数,能够根据待发送处理的消息数据的消息量进行动态调整。
例如,可根据待发送处理的消息数据的消息量,创建和/或获取对应预设个数的容器节点。其中,不同的消息量可分别对应不同的预设个数,具体可根据实际需求而定,以保证不同消息量的消息数据能够尽可能的通过分布式消息发送系统的容器节点尽快分配处理发送。例如,按照100000:1的比例,30万的消息量可创建3个容器节点,用于对这些消息量的消息数据进行处理;50万的消息量可创建5个容器节点,用于对这些消息量的消息数据进行处理。需要说明的是,如果已经创建了容器节点,并且当前处于空闲状态,那么可直接获取调用,如果仍达不到消息量对应的预设个数,则可创建新的容器节点。
并且为了保证分布式消息发送系统高可用与高性能,在本实施例中每个容器节点都可设计成无状态(其中不包含待处理的消息数据,可只包含消息发送处理的工作流程机制)、可复制的形式,相应的,为了方便快速创建容器节点,上述根据消息数据的消息量,创建预设个数的容器节点,具体可包括:创建无状态的第一个容器节点;然后通过复制该第一个容器节点,得到预设个数的容器节点。
除了上述对容器节点进行动态调整的方式以外,本实施例还可根据消息数据的消息量对单个容器节点启动的线程数进行动态调整。例如,为了实现容器节点内线程数、内存等资源动态扩容缩容,本实施例方法还可包括:首先检测目标容器节点对应待发送处理的消息数据的消息量;然后根据检测到的消息量,调整目标容器节点对应启动的线程数以及相应需要占用的内存资源。在本可选方式中,通过动态扩容缩容,有效降低了资源的浪费。在消息量大时自动增加线程数、内存等资源,提高发送效率,当消息量减少后,自动降低增加线程数、内存等资源,减少资源消耗。
本实施例提供的一种分布式消息发送的处理方法,与目前现有技术相比,本实施例可利用分布式消息发送系统中的容器节点,对待发送处理的消息数据进行分布式发送处理。并且每个容器节点各自能够启动至少一个线程获取消息数据进行分布式发送处理,相当于利用将各个容器节点中的各个启动线程作为分布式系统中的处理节点,对消息数据进行分布式发送处理,与传统的分布式处理系统相比,处理数据的节点数量更加庞大,进而数据能力能够大大增强,从而可大大提升大量待发送消息数据的处理进度,提高消息数据的发送效率,实现及时与用户之间的信息交互。做到整合消息发送的目的,保证消息发送系统的高可用性,高性能,数据一致性。并且这些容器节点可都在一台终端或少量几台终端中创建得到,节省资源占用。在处理过程中,可根据待发送处理的消息数据的消息量,对处理系统中的容器节点个数以及单个容器节点对应启动的线程数进行动态调整,在保证业务处理需求的情况下,尽可能的减少资源占用过度。
进一步的,作为上述实施例方法的细化和扩展,为了完整说明本实施例方法的具体实现过程,提供了另一种分布式消息数据的处理方法,如图4所示,该方法包括:
301、获取待发送处理的消息数据。
302、利用分布式消息数据的处理系统中的至少一个容器节点从预定存储位置中获取消息数据进行分布式发送处理,并使得这些个容器节点之间处理不同的消息数据。
其中,单个容器节点在调用单个线程从预定存储位置中获取目标消息数据进行发送处理时,可对预定存储位置中的目标消息数据进行锁定处理,使得单个线程唯一处理目标消息数据。
目前消息数据发送的传统方式具有业务复杂、消息种类繁多、功能迭代快,开发人员能力不同等特点,这种情况下,可能更容易出现bug,会导致用户收到重复消息或者测试消息,影响用户体验。因此,本实施例方法为了避免这些容器节点处理相同的消息数据,具体在利用本处理系统中的容器节点,对待发送处理的消息数据进行分布式发送处理的过程中,使得容器节点之间处理不同的消息数据。进而减少用户接收到相同消息数据的可能性。
单个容器节点在对预定存储位置中的目标消息数据进行锁定处理的过程中,可在目标线程调用处理目标消息数据时,对目标消息数据进行锁定处理,使得除该目标线程调用处理该目标消息数据以外,其他线程不能同时调用处理该目标消息数据。例如,在具体的应用场景中,在同一个容器节点中多个生产者获取消息数据时,运用mysql的事务与select…forupdate,即通过上述锁定机制,保证每个消息生产者获取到的消息数据都不一样。在发送的过程中,可通过redis检查用户已经收到消息的类型与内容,保证数据一致性和解决了消息重复发送的问题。
进一步的,为了满足相关的业务需求,步骤302具体可包括:对单个容器节点处理的消息数据进行容器内并发限制处理,使得同一业务对应的多个消息数据,按照这些个消息数据之间的预设先后顺序进行依次发送。进而使得相同业务的多个相关消息数据尽可能的不在相同时间点向用户发送,后续可根据这些消息数据之间的发文顺序依次进行发送,这样避免用户一下子收到大量消息数据,并且分不清这些消息数据的发文顺序,影响用户对消息的阅读。
再进一步的,为了保证发送的消息数据尽可能的都是有效且正常的数据,避免发送异常数据,减少浪费系统资源。可选的,步骤302具体还可包括:对单个容器节点处理的目标消息数据进行无效数据检查和/或数据安全校验;若目标消息数据未通过无效数据检查和/或未通过数据安全校验,则对目标消息数据进行异常标记,并停止对目标消息数据进行发送处理。
例如,在对目标容器节点处理的消息数据进行容器内并发发送限制处理之前,首先可对消息数据进行异常数据检查,如果发现目标消息数据为异常数据,可进行异常标记,具体可标记该消息数据为何会被确定为异常,并对该目标消息数据进行拦截,停止对其进行发送处理。后续可根据该异常标记反馈给对应投放该消息数据的业务方,有关该消息数据发送失败的原因信息。如果该目标消息数据通过异常数据检查,未发现该目标消息数据存在异常,则可后续进行容器内并发发送限制处理。
对于本可选方式,具体的异常数据检查过程可根据实际业务需求设定相应的检查内容。示例性的,为了方便了解具体的异常数据检查过程,对目标消息数据进行异常数据检查,具体可包括:对目标消息数据进行无效数据检查和/或数据安全校验;若目标消息数据未通过无效数据检查和/或未通过数据安全校验,则确定目标消息数据未通过异常数据检查。
其中,对目标消息数据进行无效数据检查的过程可根据实际业务需求而定。例如,如果是短信类的消息数据,其必须包含用户对应的手机号,如果没有手机号等短信发送号码,则可认为是无效数据;如果公众号类的消息数据,其要保证对应的公众号为有效的公众号,如果是不可用的公众号,那么可认为是无效数据。
对于目标消息数据进行数据安全校验的过程也可根据实际业务需求而定。再例如,可通过黑名单,判断投放该目标消息数据的业务方是否为黑名单用户,以及还可以通过消息数据的条数限制,限制一些条数过多的消息数据等。
通过上述对消息数据在发送之前进行异常数据检查,可准确保证发送的消息数据都是有效的且安全的消息数据。
在具体的应用场景中,同一个业务可能会关联多个公众号,为了保证公众号类消息向用户发送的成功率,相应可选的,步骤302具体还可包括:若需要通过公众号发送目标消息数据,则获取用户的公众号关注记录;根据获取到的公众号关注记录,判断用户是否关注待发送的目标公众号;若用户已关注所述目标公众号,则通过目标公众号发送对应的该目标消息数据;若用户未关注目标公众号,则通过目标公众号对应相同业务的其他公众号发送该目标消息数据,其中,用户已关注其他公众号。通过这种方式,可保证公众号类消息向用户发送的成功率,做到尽可能的将此类消息数据发送给用户。
除了上述这些可选方式以外,本实施例对于灾备与监控方面,在每个容器节点中加入了对线程池和队列的监控,如果节点中出现生产者或消费者因为不可预见性问题,及时报警,并尝试自动化监控,具体可执行步骤303至304所示的过程。
303、在利用分布式消息数据的处理系统处理消息数据的过程中,检测单个容器节点对应启动的线程中是否存在异常的线程。
304、若检测出存在异常的线程,则输出告警信息并对异常的线程进行异常修复。
其中,告警信息可为文字告警、图片告警、音频告警、视频告警、灯光告警、振动告警等。例如,根据异常的线程分析异常原因进行及时报警,后续可尝试重新启动该异常的线程,判断其是否恢复正常来进行相应修复。
基于上述各个实施例内容,示例性的,如图5所示,为不同业务消息数据的统一处理流程实例示意图,本实施例中具体可将消息数据从生产到发送统一抽离到同一个系统,所有发送消息数据的动作只需要调用消息系统的rpc远程服务接口。在消息数据发送前,发送中,发送后对消息数据进行全面的维护与监控,保证数据的正确性与消息发送的及时性。如图5所示,本处理系统应用的整体架构可包括:业务方模块a1、消息内部服务模块a2、数据存储模块a3、消息发送服务模块a4、第三方处理模块a5。
其中,业务方模块a1的业务1、业务2、业务3分别可通过消息内部服务模块a2存储在本系统对应的数据存储模块a3的数据库(db)中。其中消息内部服务模块a2可提供数据校验(无效数据检查等)、消息跟踪、公众号多号配置等服务,可借助于rpc远程服务接口实现投递消息数据的业务方模块a1与本系统之间的数据通讯,并且可结合消息队列(mq)来进一步实现。在消息数据发送处理时,可通过消息发送服务模块a4将存储在db中的待发送消息数据,按照对应的消息类型,交付给对应的第三方处理模块a5(如短信供应商、app消息服务商等)进行相应发送,其中消息发送服务模块a4可提供风险控制(黑名单过滤等)、规则适配、灾备(线程异常告警与修复等)等服务,可借助于超文本传输协议(hypertexttransferprotocol,http)实现第三方与本系统之间的数据通讯。
并且为了保证分布式消息发送系统高可用与高性能,创建容器节点,该容器节点具体可为tomcat容器。并使得每个容器节点都有自己的消息生产者,消息队列,消息消费者(可根据实际情况配置一套或多套,如一个或多个消息生产者等)。保证只要有一个容器节点存活,消息数据就可以发出去。如果发送力不足的情况,可以快速上线新的容器节点,以便提高消息数据的发送效率。在高可用方面,通过对消息类型的资源隔离(消息生产者/消费者线程池,消息队列等)这一细节设计,保证某个消息数据可能出现未知问题时候,其他消息数据不会受到影响。
例如,如图6所示,每个容器节点(pod)都会对每一类资源进行监控,自动扩容缩容,异常报警,自动修复等应急处理方案。其中,只需要平行扩展容器节点,就可以达到提高消息数据发送性能的需求。即使只有一个容器节点存活,就可以保证消息正常发送。单个pod中可绑定有消息发送处理机制,具体的,可包括调度程序b1、生产者管理b2、消费者管理b3、消息队列管理b4等。在调度程序b1中可监听相应触发事件、配置数据预加载、管理者(生产者、消费者等)创建与销毁。在生产者管理b2中可包含生产者生命周期、扩容缩容、状态监控、线程异常告警、线程复活、平滑重启等处理规则;在消费者管理b3中同样也可包含消费者生命周期、扩容缩容、状态监控、线程异常告警、线程复活、平滑重启等处理规则。在队列管理b4中可包含队列生命周期、状态监控、队列大小告警等处理规则。在pod处理具体消息数据的过程中,可按照消息类型(如图6中b5),划分每个消息类型对应的消息生产者、队列和消息消费者,然后做到针对性的处理,避免不同类型的消息数据混杂在一块,影响消息发送的准确性。
基于上述系统,本实施例的方法可应用于多种应用场景。为了进一步说明本实施例方法具体实现过程,以其中一个应用场景实例进行举例说明。需要说明的是,本应用场景只是实例性给出,不做任何限定,相当于本实施例方法众多应用场景中的一个应用场景。
如图7所示,为单个容器节点中的实施过程,具体可包括:
c1、业务方将待发送的消息数据存储在数据库db中。
c2、本系统在处理时,首先消息生产者(producer)从db中获取待发送的消息数据,其中,该数据库用于存储不同业务方发来的需要发送给用户的消息数据。
c3、然后进行数据检查。
c31、如果检查不通过则标记为失败,并告知给业务方。
c4、如果检查通过则放入消息队列中。
c5、消息队列中的待处理数据依次被消息消费者(consumer)获取。
c6、对消息消息进行黑名单检查以及条数限制检查等。
c7、如果这一次检查再通过,则进行容器节点内并发发送限制处理,保证相同业务的多个消息数据能够分不同时间点发送给用户。
c71、将被限制的消息数据尝试放回消息队列。
c72、如果失败就再存储在数据库中并标记为待发送。
c8、将没被限制的消息数据通过相应接口api进行发送;
c81、这里还涉及到了同一个业务可能会关联多个公众号,为了保证公众号类消息向用户发送的成功率,需要保证用户已关注相应的公众号,再向用户发送相应的公众号类消息数据。因此需要查询是否存在相同业务的多个公众号。
c811、若存在多个公众号,则逐一尝试;在一个公众号判定无效时,获取下一个公众号配置信息进行重试。例如,如果用户未关注,则判断同一个业务的其他公众号该用户是否关注,找到用户关注的公众号发送相应的消息数据。
c812、如果尝试成功,则修改目标公众号为可用的公众号并将状态重置为待发送,返回给数据库中。
c9、消息数据发送成功后标记为成功。
通过上述方式,在消息生产者将待发送的消息数据放入消息队列时,会进行数据正确性校验,可保证消息内容的正确性。在消息消费者准备发送消息时,会进行消息并发性校验,可保证消息不会重复发送。
上述实例为单个容器节点的处理过程,其他容器节点也可按照相同的处理方式对消息数据进行发送处理。如图8所示,为多容器节点(pods)的处理架构示意图。具体可包括多个容器节点d1、缓存d2、数据库d3。
其中,d1中单个容器节点可包括至少1个消息生产者、至少1个消息队列、至少1个消息消费者,并且可对接业务处理方,即发送消息的第三方。
d3用于存储待发送处理的消息数据,在具体发送处理时调用缓存d2根据预置的发送规则,经过黑名单过滤等手段,利用这些个容器节点d1进行分布式发送处理。发送时可根据消息模板进行发送,以满足业务上的需求。
通过应用本实施例方法,在实际的应用当中,也取得不错的效果。如目前分布式消息系统有9个节点,最高可支持300000/m的消息发送,满足当前绝大数业务发送量。并且简化业务方发送消息的工作量,提供研发效率,减少测试成本,降低bug率。
基于上述的这些实施过程,可解决目前现有技术中如下几个技术问题:
1、重复对接第三消息服务,开发成本高,维护性差。
2、业务代码出问题后可能会导致用户收到重复消息或错误消息,没有统一的管理平台。
3、消息格式种类繁多,维护难度大,无法进行后续数据分析。
4、在短时间内发送大量消息,发送慢,发送出错后,无法及时恢复。
通过应用本实施例方法,整合了消息发送系统,在面对庞大的消息量,以及复杂的业务场景,可对其进行功能的维护,性能的提升,数据的管理。保证在及时通知到用户的同时,不错发,乱发消息。同时保证消息发送系统的高可用性,高性能,数据一致性。并且在消息发送系统中加入风险控制,检查消息重复问题,测试消息,错误消息。根据业务特性检查用户接受到的信息量,提高用户体验。同时设计消息发送系统的灾备方案,应急处理方案。
需要说明的是,本实施例方法用于解决大量消息发送及维护的问题,由于是没有状态的设计,所以适用于任何需要发送消息的业务,包括不限于短信,即时通讯软件消息,app消息,邮件等各类消息。
基于上述如图3和图4所示方法,相应的,本实施例还提供了一种存储介质,其上存储有计算机程序,该程序被处理器执行时实现上述如图3和图4所示的方法。
基于这样的理解,本实施例的技术方案可以以软件产品的形式体现出来,该软件产品可以存储在一个非易失性存储介质(可以是cd-rom,u盘,移动硬盘等)中,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)执行本申请各个实施场景所述的方法。
基于上述如图1和图2所示的系统,以及图3和图4所示方法的实施例,为了实现上述目的,本实施例还提供了一种电子设备,具体可以为个人计算机,服务器,智能手机,平板电脑,智能手表或者其他网络设备等,该实体设备包括如图1和图2所示的系统,在硬件上具体可包括存储介质和处理器;存储介质,用于存储计算机程序;处理器,用于执行计算机程序以实现上述如图3和图4所示的方法。
可选的,该实体设备还可以包括用户接口、网络接口、摄像头、射频(radiofrequency,rf)电路,传感器、音频电路、wi-fi模块等等。用户接口可以包括显示屏(display)、输入单元比如键盘(keyboard)等,可选用户接口还可以包括usb接口、读卡器接口等。网络接口可选的可以包括标准的有线接口、无线接口(如wi-fi接口)等。本领域技术人员可以理解,本实施例提供的电子设备结构并不构成对该电子设备的限定,可以包括更多或更少的部件,或者组合某些部件,或者不同的部件布置。
存储介质中还可以包括操作系统、网络通信模块。操作系统是管理上述实体设备硬件和软件资源的程序,支持信息处理程序以及其它软件和/或程序的运行。网络通信模块用于实现存储介质内部各组件之间的通信,以及与信息处理实体设备中其它硬件和软件之间通信。
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到本申请可以借助软件加必要的通用硬件平台的方式来实现,也可以通过硬件实现。通过应用本实施例的技术方案,可利用分布式消息发送系统中的容器节点,对待发送处理的消息数据进行分布式发送处理。其中,这些容器节点都各自绑定有消息发送处理机制,进而可分别处理各自被分配的待发送消息数据。并且这些容器节点各自能够启动多个线程同时进行不同消息数据的发送处理,从而可大大提升大量待发送消息数据的处理进度,提高消息数据的发送效率,实现及时与用户之间的信息交互。做到整合消息发送的目的,保证消息发送系统的高可用性,高性能,数据一致性。
本说明书中各个实施例均采用递进的方式描述,每个实施例重点说明的都是与其它实施例的不同之处,各个实施例之间相同或相似的部分相互参见即可。对于系统实施例而言,由于其与方法实施例基本对应,所以描述的比较简单,相关之处参见方法实施例的部分说明即可。
可能以许多方式来实现本发明的方法和系统。例如,可通过软件、硬件、固件或者软件、硬件、固件的任何组合来实现本发明的方法和系统。用于所述方法的步骤的上述顺序仅是为了进行说明,本发明的方法的步骤不限于以上具体描述的顺序,除非以其它方式特别说明。此外,在一些实施例中,还可将本发明实施为记录在记录介质中的程序,这些程序包括用于实现根据本发明的方法的机器可读指令。因而,本发明还覆盖存储用于执行根据本发明的方法的程序的记录介质。
本发明的描述是为了示例和描述起见而给出的,而并不是无遗漏的或者将本发明限于所公开的形式。很多修改和变化对于本领域的普通技术人员而言是显然的。选择和描述实施例是为了更好说明本发明的原理和实际应用,并且使本领域的普通技术人员能够理解本发明从而设计适于特定用途的带有各种修改的各种实施例。
1.一种分布式消息数据的处理系统,其特征在于,所述处理系统包括:至少一个容器节点,用于对待发送处理的消息数据进行分布式发送处理;
其中,所述容器节点各自能够启动至少一个线程获取所述消息数据进行分布式发送处理;所述处理系统中的容器节点个数以及单个容器节点对应启动的线程数,能够根据待发送处理的消息数据的消息量进行动态调整。
2.根据权利要求1所述的处理系统,其特征在于,所述容器节点各自能够处理多种不同消息类型的消息数据,其中,每种消息类型的消息数据在发送时都有各自对应的业务处理方处理发送。
3.根据权利要求1所述的处理系统,其特征在于,所述容器节点包括:锁定单元;
所述锁定单元,用于单个容器节点在调用单个线程从预定存储位置中获取目标消息数据进行发送处理时,对所述预定存储位置中的所述目标消息数据进行锁定处理,使得所述单个线程唯一处理所述目标消息数据,其中,所述预定存储位置中保存有待发送处理的消息数据,所述处理系统中的容器节点都从所述预定存储位置中获取消息数据进行发送处理。
4.根据权利要求3所述的处理系统,其特征在于,所述容器节点还包括:检查单元;
所述检查单元,用于在对所述目标消息数据进行发送处理之前,对所述目标消息数据进行无效数据检查和/或数据安全校验;若所述目标消息数据未通过无效数据检查和/或未通过数据安全校验,则对所述目标消息数据进行异常标记,并停止对所述目标消息数据进行发送处理。
5.根据权利要求4所述的处理系统,其特征在于,所述容器节点还包括:限制单元;
所述限制单元,用于对单个容器节点处理的消息数据进行容器内并发限制处理,使得同一业务对应的多个消息数据,按照所述多个消息数据之间的预设先后顺序进行依次发送。
6.根据权利要求5所述的处理系统,其特征在于,所述容器节点还包括:处理单元;
所述处理单元,用于若需要通过公众号发送消息数据,则获取用户的公众号关注记录;根据所述公众号关注记录,判断用户是否关注待发送的目标公众号;若用户已关注所述目标公众号,则通过所述目标公众号发送对应的消息数据;若用户未关注所述目标公众号,则通过所述目标公众号对应相同业务的其他公众号发送消息数据,其中用户已关注所述其他公众号。
7.根据权利要求1所述的处理系统,其特征在于,所述处理系统还包括:检测模块和处理模块;
所述检测模块,用于检测单个容器节点对应启动的线程中是否存在异常的线程;
所述处理模块,用于若所述检测模块检测出存在异常的线程,则输出告警信息并对所述异常的线程进行异常修复。
8.根据权利要求1所述的处理系统,其特征在于,所述处理系统还包括:创建模块;
所述创建模块,用于通过复制无状态的容器节点创建所述处理系统中的各个容器节点。
9.一种分布式消息数据的处理方法,其特征在于,所述处理方法包括:
获取待发送处理的消息数据;
利用分布式消息数据的处理系统中的至少一个容器节点,对所述消息数据进行分布式发送处理;
其中,所述容器节点各自能够启动至少一个线程获取所述消息数据进行分布式发送处理;所述处理系统中的容器节点个数以及单个容器节点对应启动的线程数,能够根据待发送处理的消息数据的消息量进行动态调整。
10.根据权利要求9所述的处理方法,其特征在于,所述利用分布式消息数据的处理系统中的至少一个容器节点,对所述消息数据进行分布式发送处理,包括:
利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,并使得所述至少一个容器节点之间处理不同的消息数据;
其中,单个容器节点在调用单个线程从所述预定存储位置中获取目标消息数据进行发送处理时,对所述预定存储位置中的所述目标消息数据进行锁定处理,使得所述单个线程唯一处理所述目标消息数据。
11.根据权利要求10所述的处理方法,其特征在于,所述利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,包括:
对单个容器节点处理的目标消息数据进行无效数据检查和/或数据安全校验;
若所述目标消息数据未通过无效数据检查和/或未通过数据安全校验,则对所述目标消息数据进行异常标记,并停止对所述目标消息数据进行发送处理。
12.根据权利要求11所述的处理方法,其特征在于,所述利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,还包括:
对单个容器节点处理的目标消息数据进行容器内并发限制处理,使得同一业务对应的多个消息数据,按照所述多个消息数据之间的预设先后顺序进行依次发送。
13.根据权利要求12所述的处理方法,其特征在于,所述利用所述至少一个容器节点从预定存储位置中获取所述消息数据进行分布式发送处理,还包括:
若需要通过公众号发送目标消息数据,则获取用户的公众号关注记录;
根据所述公众号关注记录,判断用户是否关注待发送的目标公众号;
若用户已关注所述目标公众号,则通过所述目标公众号发送所述目标消息数据;
若用户未关注所述目标公众号,则通过所述目标公众号对应相同业务的其他公众号发送所述目标消息数据,其中用户已关注所述其他公众号。
14.根据权利要求9所述的处理方法,其特征在于,所述方法还包括:
检测单个容器节点对应启动的线程中是否存在异常的线程;
若检测出存在异常的线程,则输出告警信息并对所述异常的线程进行异常修复。
15.根据权利要求9所述的处理方法,其特征在于,所述方法还包括:
创建无状态的第一个容器节点;
通过复制所述第一个容器节点,创建所述处理系统中的各个容器节点。
16.一种存储介质,其上存储有计算机程序,其特征在于,所述计算机程序被处理器执行时实现权利要求9至15中任一项所述的处理方法。
17.一种电子设备,其特征在于,包括:如权利要求1至8中任一项所述的处理系统。
技术总结