CN114942916A - Doris-based real-time data bin design method, device, equipment and storage medium - Google Patents
Doris-based real-time data bin design method, device, equipment and storage medium Download PDFInfo
- Publication number
- CN114942916A CN114942916A CN202210325028.7A CN202210325028A CN114942916A CN 114942916 A CN114942916 A CN 114942916A CN 202210325028 A CN202210325028 A CN 202210325028A CN 114942916 A CN114942916 A CN 114942916A
- Authority
- CN
- China
- Prior art keywords
- data
- doris
- real
- task
- time
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Granted
Links
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/21—Design, administration or maintenance of databases
- G06F16/211—Schema design and management
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F11/00—Error detection; Error correction; Monitoring
- G06F11/30—Monitoring
- G06F11/3003—Monitoring arrangements specially adapted to the computing system or computing system component being monitored
- G06F11/302—Monitoring arrangements specially adapted to the computing system or computing system component being monitored where the computing system component is a software system
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/21—Design, administration or maintenance of databases
- G06F16/215—Improving data quality; Data cleansing, e.g. de-duplication, removing invalid entries or correcting typographical errors
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2455—Query execution
- G06F16/24552—Database cache management
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/25—Integrating or interfacing systems involving database management systems
- G06F16/254—Extract, transform and load [ETL] procedures, e.g. ETL data flows in data warehouses
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/28—Databases characterised by their database models, e.g. relational or object models
- G06F16/283—Multi-dimensional databases or data warehouses, e.g. MOLAP or ROLAP
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/44—Arrangements for executing specific programs
- G06F9/445—Program loading or initiating
- G06F9/44505—Configuring for program initiating, e.g. using registry, configuration files
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
- G06F9/485—Task life-cycle, e.g. stopping, restarting, resuming execution
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Software Systems (AREA)
- Data Mining & Analysis (AREA)
- Computing Systems (AREA)
- Quality & Reliability (AREA)
- Mathematical Physics (AREA)
- Computational Linguistics (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
Description
技术领域technical field
本发明属于物流平台的数据仓库技术领域,尤其涉及于Doris的实时数 仓设计方法、装置、设备及存储介质。The invention belongs to the technical field of data warehouses of logistics platforms, and in particular relates to a real-time data warehouse design method, device, equipment and storage medium of Doris.
背景技术Background technique
随着快递行业的日益增长,物流快递数据的单日票件量已达亿级,面对 如此大的数据量,不仅有票件数据,还有智能硬件、比如说车辆摄像头等一 些监控方面的数据,要对这些数据进行处理,需要有一个交互的分析平台, 以便业务人员执行一些Adhoc(多跳网)命令去查询不同的数据,还要支持 通用的SQL(结构化查询语言)语法,降低学习成本,并且需要查询快捷。With the increasing growth of the express industry, the single-day ticket volume of logistics express data has reached 100 million. Faced with such a large amount of data, there are not only ticket data, but also intelligent hardware, such as vehicle cameras and other monitoring aspects. To process these data, an interactive analysis platform is required, so that business personnel can execute some Adhoc (multi-hop network) commands to query different data, and also support general SQL (Structured Query Language) syntax, reducing Learning costs, and need to query quickly.
在实时数据处理时经常发生挂掉或阻塞的情况。Hanging or blocking often occurs during real-time data processing.
另外,每天的数据量达到了百亿级以上,不仅要求分析平台的性能要快, 还要有各种票单量的明细级多表去Join(连接)亿级的多个票单量维表。另 外,需要多表的票单量精确去重,并保证数据的准确性。同时平台还需要要 支持多种报表的导出和多种数据源的导入。现有的物流平台的数仓建设上无 法做到上述需求。In addition, the daily data volume has reached more than 10 billion levels, which not only requires the performance of the analysis platform to be fast, but also requires detailed-level multi-tables of various ticket volumes to join (connect) multiple ticket-volume dimension tables of the billion level. . In addition, it is necessary to accurately deduplicate the amount of tickets with multiple tables, and ensure the accuracy of the data. At the same time, the platform also needs to support the export of various reports and the import of various data sources. The construction of warehouses on existing logistics platforms cannot meet the above requirements.
发明内容SUMMARY OF THE INVENTION
本发明提供了一种基于Doris的实时数仓设计方法、装置、设备及存储 介质,解决技术中实时数据导入时挂掉或阻塞的情况,另外也提供支持更多 数据源导入的方案。The present invention provides a Doris-based real-time data warehouse design method, device, equipment and storage medium, which solves the problem of hanging or blocking during real-time data import in the technology, and also provides a solution to support the import of more data sources.
本方法包括:This method includes:
将业务实时数据导入到Kakfa队列,并放入适配的topic标签中,提供消 费所述topic标签中数据的至少一消费任务,设置适配的Routine Load任务 监控;Import the real-time business data into the Kakfa queue and put it into the adapted topic tag, provide at least one consumption task that consumes the data in the topic tag, and set the adapted Routine Load task monitoring;
所述Routine Load任务监控实时读取用于表明所述topic标签中消费当 前数据所在位置信息的偏移量;Described Routine Load task monitoring reads in real time the offset that is used to indicate the offset of the location information where the current data is consumed in the topic label;
若所述偏移量在预设时间内未增长,则重新启动所述Routine Load任务 监控,从所述偏移量继续执行所述消费任务;If the offset does not increase within the preset time, then restart the Routine Load task monitoring, and continue to execute the consumption task from the offset;
将所述消费任务执行后的Stream Load数据流导入到Doris中。Import the Stream Load data stream after the execution of the consumption task into Doris.
进一步,将所述消费任务执行后的Stream Load数据进行导入到Doris 中进一步包括:在实时计算集群Flink上创建一个所述Stream Load数据流最 终输出位置到所述Doris中的sink:flink-connector-dorisdb;Stream Load数 据通过缓存并批量直接由Http导入至所述Doris中。Further, importing the Stream Load data after the execution of the consumption task into Doris further includes: creating a final output position of the Stream Load data stream on the real-time computing cluster Flink to the sink in the Doris: flink-connector- dorisdb; Stream Load data is directly imported into the Doris from Http through the cache and in batches.
本方案还包括对二次业务逻辑处理的数据的实时导入,则包括:This solution also includes real-time import of data processed by secondary business logic, including:
将所述消费任务执行后的Stream Load数据流导入到Doris中之后还包括:After the Stream Load data stream after the execution of the consumption task is imported into Doris, it also includes:
找到需要二次业务逻辑处理的实时数据,先对所述实时数据通过Flink 进行所述二次业务的逻辑处理,再对所述逻辑处理后的数据进行缓存,最后 将缓存的所述数据由Stream Load导入Doris。需要二次业务逻辑处理的至少 一实时数据先通过Flink的二次业务逻辑处理进一步包括:至少一所述实时 数据的二次业务逻辑设置成一任务管理,所述任务管理提交适配的任务作业 给JobManager,所述JobManager负责所述作业的调度和资源分配,所述 JobManager将所述作业分给对应的TaskManager,所述TaskManager收到所 述任务作业后,启动线程去执行,并向所述JobManager报告所述任务状态和 自身运行状态;当所述任务执行结束后,所述JobManager将收到通知,并将 二次业务逻辑处理后的数据后发送给对应的任务管理。Find the real-time data that needs secondary business logic processing, first perform logical processing of the secondary business on the real-time data through Flink, then cache the logically processed data, and finally store the cached data by Stream Load import Doris. At least one real-time data that needs secondary business logic processing is first processed by Flink's secondary business logic and further includes: at least one secondary business logic of the real-time data is set to a task management, and the task management submits an adapted task job to the task management. JobManager. The JobManager is responsible for the scheduling and resource allocation of the job. The JobManager assigns the job to the corresponding TaskManager. After receiving the task job, the TaskManager starts a thread to execute it and reports to the JobManager. The task status and its own running status; when the task execution ends, the JobManager will receive a notification and send the data processed by the secondary business logic to the corresponding task manager.
举例中,需要二次业务逻辑处理的至少一实时数据先通过Flink的二次 业务逻辑处理进一步包括:In an example, at least one real-time data requiring secondary business logic processing is first processed by Flink's secondary business logic and further includes:
先从ods日志主题消费数据,然后通过用户行为,将所述数据中的行为 数据分为3种:页面日志,曝光日志,启动日志;First consume data from the ods log topic, and then divide the behavior data in the data into three types through user behavior: page log, exposure log, and startup log;
通过定义了一个Flink的状态变量为时间,用于过滤新用户的所述数据, 判断是否是新用户,如果是第一次来,对应的状态是空,将现在的时间更新 至所述状态,否则状态不为空,则所述当前用户修改为老用户;By defining a Flink state variable as time, it is used to filter the data of the new user to determine whether it is a new user. If it is the first time, the corresponding state is empty, and the current time is updated to the state. Otherwise, the status is not empty, then the current user is modified to an old user;
对进行过滤处理后的所述数据进行分流处理,所述启动日志和所述曝光 数据写入侧输出流,所述页面数据写入主流并通过json获取对应的值,在start 中写入启动所述测输出流,去除所述启动日志全部为所述页面日志,将所述 页面日志写入到主流,再将所述曝光日志写入到曝光侧输出流Divide the data after filtering, write the startup log and the exposure data into the output stream, write the page data into the mainstream and obtain the corresponding value through json, and write the startup data in start. Describe the measurement output stream, remove all the startup logs from the page log, write the page log to the mainstream, and then write the exposure log to the exposure side output stream
本方法还包括离线业务数据的导入,其包括:The method also includes the import of offline business data, which includes:
在离线业务场景中,利用Broker Load将所述离线业务数据从HDFS导 入所述Doris;In an offline business scenario, use Broker Load to import the offline business data into the Doris from HDFS;
通过SQL方式,使用DolphinSchedule配置调度任务,批处理完成所述 离线业务数据的处理,后汇总到一个总表中。Through SQL, use DolphinSchedule to configure scheduling tasks, complete the processing of the offline business data in batches, and then summarize them into a general table.
本方法还包括相对固定的数据导入,其还包括:The method also includes relatively fixed data import, which further includes:
在实际业务场景中相对固定的数据,直接对接数据服务大厅,由所述数 据服务大厅与Power BI工具和前端报表对接。Relatively fixed data in actual business scenarios is directly connected to the data service hall, which is connected to Power BI tools and front-end reports.
基于相同的构思,本发明还提供一种基于Doris的实时数仓设计装置, 包括:Based on the same concept, the present invention also provides a Doris-based real-time data warehouse design device, including:
Routine Load任务监控设置模块:用于将业务实时数据导入到Kakfa队 列,并放入适配的topic标签中,后提供消费所述topic标签中数据的至少一 消费任务,设置适配的Routine Load任务监控;Routine Load task monitoring and setting module: used to import real-time business data into the Kakfa queue and put it into the adapted topic tag, then provide at least one consumption task that consumes the data in the topic tag, and set the adapted Routine Load task monitor;
Routine Load任务监控处理模块:用于所述Routine Load任务监控实时 读取用于表明所述topic标签中消费当前数据所在位置信息的偏移量;Routine Load task monitoring and processing module: for the Routine Load task monitoring real-time read the offset for indicating the location information of the current data consumption in the topic label;
所述Routine Load任务监控启动模块:用于若所述偏移量在预设时间内 未增长,则重新启动所述Routine Load任务监控,从所述偏移量继续执行所 述消费任务;The Routine Load task monitoring startup module: for if the offset does not increase within a preset time, then restart the Routine Load task monitoring, and continue to execute the consumption task from the offset;
导入模块:用于将所述消费任务执行后的Stream Load数据流导入到 Doris中。Import module: used to import the Stream Load data stream after the execution of the consumption task into Doris.
基于相同的构思,本发明还一种计算机设备,包括:Based on the same concept, the present invention also provides a computer device, comprising:
存储器,所述存储器用于存储处理程序;a memory for storing the processing program;
处理器,所述处理器执行所述处理程序时实现上述的基于Doris的实时 数仓设计方法。A processor, when the processor executes the processing program, the above-mentioned Doris-based real-time data warehouse design method is implemented.
基于相同的构思,本发明还提供一种可读存储介质,所述可读存储介质 上存储有处理程序,所述处理程序被处理器执行时实现上述基于Doris的实 时数仓设计方法方法。Based on the same concept, the present invention also provides a readable storage medium, on which a processing program is stored, and when the processing program is executed by a processor, the above-mentioned Doris-based real-time data warehouse design method is implemented.
采用上述技术方案后,与现有技术相比,本发明的有益之处在于:After adopting the above-mentioned technical scheme, compared with the prior art, the beneficial part of the present invention is:
本发明做了一个自定义的Routine Load监控,去实时读取它的offset, 如果发现了offset不再增长之后,就会手动通过监控系统去执行停掉 Routine Load的任务,并重新启动Routine Load,offset用之前的,这样既 能保证数据不丢,也不会重复,同时恢复Routine Load,本发明提升实时数 据导入处理的效率且提升其安全性,不容易阻塞。数据无积压,保障在数据 量高峰访问时平稳运行。The present invention makes a self-defined Routine Load monitoring to read its offset in real time. If it is found that the offset no longer increases, it will manually execute the task of stopping the Routine Load through the monitoring system, and restart the Routine Load. The offset is used before, which can ensure that the data is not lost or repeated, and at the same time restores the Routine Load. The present invention improves the efficiency and security of real-time data import processing, and is not easy to block. There is no backlog of data, ensuring smooth operation during peak data access.
本发明支持多种报表的导出和多种数据源的导入。对于数仓建设在提高 业务支持效率、降低维护成本上都具有非常重大的意义。比如,在复杂的业 务逻辑中,业务实时数据先通过Flink处理,再利用Redis或PIKA缓存,处 理完成既可以用Stream Load方法导入Doris;在离线业务场景中,业务数据 利用Broker Load将业务数据从HDFS导入Doris,再通过SQL方式,使用 DolphinScheduler跑批处理复杂业务数据,最后汇总到一个总表中;在实际 业务场景中,直接对接数据服务大厅,由数据服务大厅对接Power BI工具和 前端报表。多表的票单量精确去重,并保证数据的准确性;完全可以支持多 种报表的导出和多种数据源的导入。The present invention supports the export of various reports and the import of various data sources. It is of great significance for the construction of data warehouses to improve business support efficiency and reduce maintenance costs. For example, in complex business logic, real-time business data is first processed by Flink, and then cached by Redis or PIKA. After processing, the Stream Load method can be used to import Doris; in offline business scenarios, the business data uses Broker Load to transfer business data from HDFS imports Doris, and then uses DolphinScheduler to run batch processing of complex business data through SQL, and finally summarizes it into a summary table; in actual business scenarios, it is directly connected to the data service hall, and the data service hall is connected to Power BI tools and front-end reports. The amount of tickets with multiple tables is accurately deduplicated, and the accuracy of the data is guaranteed; it can fully support the export of multiple reports and the import of multiple data sources.
附图说明Description of drawings
下面结合附图对本发明的具体实施方式作进一步详细说明,其中:The specific embodiments of the present invention will be described in further detail below in conjunction with the accompanying drawings, wherein:
图1为本发明基于Doris的实时数仓设计方法的第一实例原理程图;Fig. 1 is the first example schematic diagram of the real-time data warehouse design method based on Doris of the present invention;
图2为本发明基于Doris的实时数仓设计系统的原理框图;Fig. 2 is the principle block diagram of the real-time data warehouse design system based on Doris of the present invention;
图3为本发明基于Doris的实时数仓设计方法的第二实例流程图;Fig. 3 is the second example flow chart of the real-time data warehouse design method based on Doris of the present invention;
图4为本发明基于Doris的实时数仓设计装置实施例示意图;4 is a schematic diagram of an embodiment of a real-time data warehouse design device based on Doris according to the present invention;
图5为本发明计算机设备的实施例示意图。FIG. 5 is a schematic diagram of an embodiment of a computer device of the present invention.
具体实施方式Detailed ways
以下结合附图和具体实施例对本发明作进一步详细说明。根据下面说明 和权利要求书,本发明的优点和特征将更清楚。需说明的是,附图均采用非 常简化的形式且均使用非精准的比率,仅用以方便、明晰地辅助说明本发明 实施例的目的。The present invention will be further described in detail below with reference to the accompanying drawings and specific embodiments. The advantages and features of the present invention will become more apparent from the following description and claims. It should be noted that, the accompanying drawings are all in a very simplified form and all use imprecise ratios, and are only used to facilitate and clearly assist the purpose of explaining the embodiments of the present invention.
需要说明,本发明实施例中所有方向性指示(诸如上、下、左、右、前、 后……)仅用于解释在某一特定姿态(如附图所示)下各部件之间的相对位置关 系、运动情况等,如果该特定姿态发生改变时,则该方向性指示也相应地随 之改变。It should be noted that all directional indications (such as up, down, left, right, front, back, etc.) in the embodiments of the present invention are only used to explain the relationship between various components under a certain posture (as shown in the accompanying drawings). The relative positional relationship, the movement situation, etc., if the specific posture changes, the directional indication also changes accordingly.
实施例一Example 1
大数据的时代,数据的处理能力大大增强,但在最后一个环节,即数据 应用服务环节依然存在较大的瓶颈。原来业务数据库时代,高并发、高灵活 性是一个矛盾体,如何让一线在灵活定制分析SQL(结构化查询语言)时候, 虽然拖拉拽生成了不忍直视的SQL,但希望查询引擎依旧可以保持强劲的性 能指标,不管是并发度还是查询时间都能让客户满意,是蛮有挑战的一件事 情。目前的查询分析非常多,并且还在不断的涌现出来,引擎层出不穷,各 有优势也有其缺点,比如ADB、Hologres、Presto、Kylin、Hbase(几种数据 库名称),这些产品本质上都是用资源换时间,或者空间换时间,本质上就是 计算机制重构、硬件提速(比如SSD磁盘,即固态硬盘)、索引提速(比如 位图)、空间转换(比如预计算)等维度是提升性能。不断涌现且更新的技 术产品也从侧面印证了查询引擎的问题依然很多,痛点依旧没有解决,同时 也大有可为。Doris(多里斯)是最近适应于实时分析场景,确实解决了查询引擎的一些场景下的痛点问题的一数据库系统。In the era of big data, the data processing capability is greatly enhanced, but there is still a big bottleneck in the last link, that is, the data application service link. It turns out that in the era of business databases, high concurrency and high flexibility are contradictory. How to make the front-line flexibly customize and analyze SQL (Structured Query Language), although drag and drop generates SQL that can’t bear to look at, but I hope the query engine can still maintain Strong performance indicators, whether it is concurrency or query time, can satisfy customers, which is quite a challenge. At present, there are a lot of query analysis, and they are still emerging. Engines emerge in an endless stream, each with its own advantages and disadvantages, such as ADB, Hologres, Presto, Kylin, Hbase (several database names), these products essentially use resources Changing time, or changing space for time, essentially means computing mechanism reconstruction, hardware acceleration (such as SSD disks, or solid-state drives), index acceleration (such as bitmaps), and space conversion (such as precomputing) to improve performance. The emerging and updated technology products also confirm from the side that there are still many problems with the query engine, the pain points are still unresolved, and there is still much to be done. Doris is a database system that has recently been adapted to real-time analysis scenarios and has indeed solved the pain points of query engines in some scenarios.
Doris由FrontEnd DorisDB(前端节点)和BackEnd DorisDB(后端节点) 核心组件组成。前端节点负责管理元数据、管理客户端的连接、进行查询规 划和调度等工作;后端节点负责数据存储、计算执行、副本管理等;另外还 包括DorisManager(Doris管理工具)和Broker(纪经),DorisManager管 理工具,负责提供集群管理、在线查询、故障查询、监控报警的可视化工具; Broker负责和外部存储(HDFS,Hadoop分布式文件系统或对象存储)进行数据的导出导入等辅助功能;Doris可以通过MySQL客户端直接访问。Doris consists of FrontEnd DorisDB (front-end node) and BackEnd DorisDB (back-end node) core components. The front-end node is responsible for managing metadata, managing client connections, query planning and scheduling; the back-end node is responsible for data storage, calculation execution, copy management, etc.; it also includes DorisManager (Doris management tool) and Broker (broker), The DorisManager management tool is responsible for providing visualization tools for cluster management, online query, fault query, monitoring and alarming; Broker is responsible for auxiliary functions such as data export and import with external storage (HDFS, Hadoop distributed file system or object storage); Doris can pass MySQL client direct access.
Doris核心组件及运行模式如下(以下FrontEnd DorisDB简称FE, BackEndDorisDB简称BE):The core components and operating modes of Doris are as follows (hereinafter FrontEnd DorisDB referred to as FE, BackEndDorisDB referred to as BE):
首先介绍FE。FE is introduced first.
1)管理元数据,执行SQL DDL命令,用Catalog(记录库),表,分区, tablet副本等信息。1) Manage metadata, execute SQL DDL commands, and use Catalog (record library), tables, partitions, tablet replicas and other information.
2)FE高可用部署,使用复制协议选主和主从同步元数据,所有的元数据 修改操作,由FE leader节点(FE主节点)完成,FE follower节点(FE跟踪节 点)可执行读操作。元数据的读写满足顺序一致性。FE的节点数目采用 2n+1,可容忍n个节点故障。当FE leader故障时,从现有的follower节点重 新选主,完成故障切换。2) FE high-availability deployment, using the replication protocol to select the master and master-slave to synchronize metadata, all metadata modification operations are completed by the FE leader node (FE master node), and the FE follower node (FE tracking node) can perform read operations. Metadata reads and writes meet sequential consistency. The number of FE nodes is 2n+1, which can tolerate n node failures. When the FE leader fails, the existing follower node is re-elected to complete the failover.
3)FE的SQL layer(SQL层)对用户提交的SQL进行解析,分析,改写, 语义分析和关系代数优化,生产逻辑执行计划。3) FE's SQL layer (SQL layer) parses, analyzes, rewrites, semantically analyzes and optimizes relational algebra to the SQL submitted by the user, and produces a logic execution plan.
4)FE的Planner(SQL物理计划)负责把逻辑计划转化为可分布式执行 的物理计划,分发给一组BE。4) The Planner (SQL physical plan) of FE is responsible for transforming the logical plan into a physical plan that can be executed in a distributed manner and distributes it to a group of BEs.
5)FE监督BE,管理BE的上下线,根据BE的存活和健康状态,维持 tablet(副本)的数量。5) FE supervises BE, manages the online and offline of BE, and maintains the number of tablets (replicas) according to the survival and health status of BE.
6)FE协调数据导入,保证数据导入的一致性。6) FE coordinates data import to ensure the consistency of data import.
其次介绍BE。Next, we introduce BE.
1)BE管理tablet(副本),tablet是table经过分区分桶形成的子表,采用列 式存储。1) BE manages the tablet (copy), the tablet is a sub-table formed by partitioning and bucketing, and it is stored in a columnar format.
2)BE受FE指导,创建或删除子表。2) BE is guided by FE to create or delete child tables.
3)BE接收FE分发的物理执行计划并指定BE coordinator节点(BE协 调节点),在BE coordinator的调度下,与其他BE worker(BE执行节点)共 同协作完成执行。3) BE receives the physical execution plan distributed by FE and designates the BE coordinator node (BE coordination adjustment point), and under the scheduling of the BE coordinator, it cooperates with other BE workers (BE execution nodes) to complete the execution.
4)BE读本地的列存储引擎获取数据,并通过索引和谓词下沉快速过滤数 据。4) BE reads the local column storage engine to obtain data, and quickly filters the data through index and predicate sinking.
5)BE后台执行compact(压缩)任务,减少查询时的读放大。5) BE performs compact (compression) tasks in the background to reduce read amplification during query.
以查询为例,通过FE的组织、协调、控制,对提交的SQL进行解析,分 析,改写,优化和规划,生成分布式执行计划,然后由若干BE执行,并在若 干BE中选定一个coordinator(协作者),由协作者协调n个BE进行本地计 算,然后返回给协作者,协作者汇总后返回给FE最终结果,最后由FE将最 终结果提供给最终用户。Taking the query as an example, through the organization, coordination and control of FE, the submitted SQL is parsed, analyzed, rewritten, optimized and planned to generate a distributed execution plan, which is then executed by several BEs, and a coordinator is selected among several BEs (collaborator), the collaborator coordinates n BEs for local calculation, and then returns to the collaborator, the collaborator returns the final result to the FE after summarizing, and finally the FE provides the final result to the end user.
本技术方案提供了一种基于Doris的实时数仓设计方法的第一实例原理 程图(请参阅图1),包括对多种数据用不同的方式导入到Doris数仓中:This technical solution provides a first example schematic diagram of a Doris-based real-time data warehouse design method (see Figure 1), including importing various data into the Doris data warehouse in different ways:
S1:对实时数据的导入:通过将业务实时数据从Flume(海量日志收集聚 合传输系统)或Canal(阿里开源的中间件)中导入到Kafka(一种高吞吐量 的分布式发布订阅消息系统),Kafka(一种分布式日志系统)中业务实时数 据经过多次数据处理,数据处理后用SQL导入到Doris。S1: Import of real-time data: import real-time business data from Flume (mass log collection, aggregation and transmission system) or Canal (Alibaba's open source middleware) to Kafka (a high-throughput distributed publish-subscribe messaging system) , The real-time business data in Kafka (a distributed log system) undergoes multiple data processing, and is imported into Doris using SQL after data processing.
S2:对复杂的实时数据的导入:一般对实时数据需要进行二次逻辑处理称 为复杂的实时数据的业务逻辑。这类业务实时数据先通过Flink(针对流数据 和批数据的分布式处理引擎,即实时计算集群)处理,再利用Redis或PIKA (奇虎公司开源的一款类redis存储系统)缓存,处理完成既可以用Stream Load(一种同步的导入方式)方法导入Doris。具体来说,先找到需要二次 业务逻辑处理的实时数据,先对所述实时数据通过Flink进行所述二次业务 的逻辑处理,再对所述逻辑处理后的数据进行缓存,最后将缓存的所述数据 由Stream Load导入Doris。S2: Importing complex real-time data: Generally, real-time data needs to be processed by secondary logic, which is called complex real-time data business logic. The real-time data of this type of business is first processed by Flink (a distributed processing engine for streaming data and batch data, that is, a real-time computing cluster), and then cached by Redis or PIKA (a redis-like storage system open source by Qihoo), and the processing is completed. You can either import Doris with the Stream Load (a synchronous import method) method. Specifically, first find the real-time data that needs secondary business logic processing, first perform logical processing of the secondary business on the real-time data through Flink, then cache the logically processed data, and finally cache the cached data. The data is imported into Doris by Stream Load.
S13:对离线业务数据的导入:业务数据利用Broker Load(一个异步的导 入方式)将业务数据从HDFS(Hadoop分布式文件系统)导入Doris,再通 过SQL方式,使用DolphinScheduler(批处理计划)跑批处理复杂业务数据, 最后汇总到一个总表中。换个角度来说,如果数据源是Hive/HDFS,推荐采 用Broker Load导入,如果数据表很多导入比较麻烦可以考虑使用Hive外表 直连查询,性能会比Broker load导入效果差,但是可以避免数据搬迁,如果 单表的数据量特别大,或者需要做全局数据字典来精确去重可以考虑Spark Load导入。S13: Import of offline business data: The business data uses Broker Load (an asynchronous import method) to import business data from HDFS (Hadoop Distributed File System) into Doris, and then uses SQL to run batches using DolphinScheduler (batch plan). Complex business data is processed and finally aggregated into a summary table. From another point of view, if the data source is Hive/HDFS, Broker Load is recommended for import. If it is troublesome to import a lot of data tables, you can consider using Hive external direct connection query. The performance will be worse than Broker Load import, but it can avoid data relocation. If the amount of data in a single table is particularly large, or you need to do a global data dictionary to accurately deduplicate, you can consider Spark Load import.
S14:固定数据(如特别表格等)的导入:在实际业务场景中,直接对接 数据服务大厅,由数据服务大厅对接Power BI工具和前端报表。多表的票单 量精确去重,并保证数据的准确性;本发明的提案支持多种报表的导出和多 种数据源的导入;数据无积压,保障在数据量高峰访问时平稳运行。S14: Import of fixed data (such as special forms, etc.): In actual business scenarios, directly connect to the data service hall, and the data service hall connects to Power BI tools and front-end reports. The amount of tickets in multiple tables is accurately deduplicated, and the accuracy of the data is ensured; the proposal of the present invention supports the export of various reports and the import of various data sources; there is no backlog of data, ensuring smooth operation during peak data access.
本发明中,一个导入作业可以分为几个阶段:In the present invention, an import operation can be divided into several stages:
1)LOADING1) LOADING
该阶段先对数据进行清洗和转换,然后将数据发送给BE处理。当数据 全部导入后,进入等待生效过程,此时导入作业状态依旧是LOADING。This stage cleans and transforms the data first, and then sends the data to BE for processing. After all the data is imported, the process of waiting for validation is entered. At this time, the status of the import job is still LOADING.
2)FINISHED2)FINISHED
在导入作业涉及的所有数据均生效后,作业的状态变成FINISHED, FINISHED后导入的数据均可查询。FINISHED是导入作业的最终状态。After all the data involved in the import job takes effect, the job status becomes FINISHED, and the imported data after FINISHED can be queried. FINISHED is the final status of the import job.
3)CANCELLED3)CANCELLED
在导入作业状态变为FINISHED之前,作业随时可能被取消并进入 CANCELLED状态,如用户手动取消或导入出现错误等。CANCELLED也是 导入作业的一种最终状态。Before the import job status becomes FINISHED, the job may be canceled and enter the CANCELLED state at any time, such as manual cancellation by the user or an error in the import. CANCELLED is also a final state of an import job.
本发明的导入作业可以适用场景如下所示:The applicable scenarios of the import operation of the present invention are as follows:
1)HDFS导入1) HDFS import
源数据存储在HDFS中,数据量为几十GB到上百GB时,可采用Broker Load方法向DorisDB导入数据。此时要求部署的Broker进程可以访问HDFS 数据源。导入数据的作业异步执行,用户可通过SHOW LOAD命令查看导入 结果。When the source data is stored in HDFS, and the data volume ranges from tens of GB to hundreds of GB, the Broker Load method can be used to import data to DorisDB. At this point, the deployed Broker process is required to have access to the HDFS data source. The job of importing data is executed asynchronously, and users can view the import result through the SHOW LOAD command.
源数据存储在HDSF中,数据量达到TB级别时,可采用Spark Load方 法向DorisDB导入数据。此时要求部署的Spark进程可以访问HDFS数据源。 导入数据的作业异步执行,用户可通过SHOW LOAD命令查看导入结果。The source data is stored in HDSF. When the data volume reaches the TB level, the Spark Load method can be used to import data to DorisDB. At this point, the deployed Spark process is required to have access to the HDFS data source. The job of importing data is executed asynchronously, and users can view the import result through the SHOW LOAD command.
对于其它外部数据源,只要Broker或Spark进程能读取对应数据源,也 可采用Broker Load或Spark Load方法导入数据。For other external data sources, as long as the Broker or Spark process can read the corresponding data source, the Broker Load or Spark Load method can also be used to import data.
2)本地文件导入2) Local file import
数据存储在本地文件中,数据量小于10GB,可采用Stream Load方法将 数据快速导入DorisDB系统。采用HTTP协议创建导入作业,作业同步执行, 用户可通过HTTP请求的返回值判断导入是否成功。The data is stored in local files, and the data volume is less than 10 GB. The Stream Load method can be used to quickly import the data into the DorisDB system. The import job is created using the HTTP protocol, the job is executed synchronously, and the user can judge whether the import is successful through the return value of the HTTP request.
3)Kafka导入3) Kafka import
数据来自于Kafka等流式数据源,需要向DorisDB系统导入实时数据时, 可采用Routine Load方法。用户通过MySQL协议创建例行导入作业,DorisDB 持续不断地从Kafka中读取并导入数据。The data comes from streaming data sources such as Kafka. When real-time data needs to be imported into the DorisDB system, the Routine Load method can be used. Users create routine import jobs through the MySQL protocol, and DorisDB continuously reads and imports data from Kafka.
4)Insert Into导入4) Insert Into import
手工测试及临时数据处理时可以使用Insert Into方法向DorisDB表中写 入数据。其中,INSERT INTO tbl SELECT...;语句是从DorisDB的表中读取 数据并导入到另一张表;INSERT INTO tbl VALUES(...);语句向指定表里插入 单条数据。The Insert Into method can be used to write data to the DorisDB table during manual testing and temporary data processing. Among them, the INSERT INTO tbl SELECT...; statement reads data from the DorisDB table and imports it into another table; the INSERT INTO tbl VALUES(...); statement inserts a single piece of data into the specified table.
5)同时,还有其他一些方式Json数据导入(对于一些半结构化的比如 Json类型的数据,我们可以用stream load或者routine load的方式进行导入。 Stream Load:对于文本文件存储的Json数据,我们可以使用stream load进 行导入。Routine Load:对于Kafka中的json格式数据,可以使用Routine load 的方式导入)、flink-connector-dorisdb(内部实现是通过缓存并批量由stream load导入)、DataX-dorisdb-writer(DorisWriter插件实现了写入数据到 DorisDB的目的表的功能。在底层实现上,DorisWriter通过Streamload以 csv或json格式导入数据至DorisDB。内部将reader读取的数据进行缓存后 批量导入至DorisDB,以提高写入性能。总体数据流是source->Reader-> DataX channel->Writer->DorisDB。)5) At the same time, there are other ways to import Json data (for some semi-structured data such as Json type, we can use stream load or routine load to import. Stream Load: For Json data stored in text files, we You can use stream load to import. Routine Load: For json format data in Kafka, you can use Routine load to import), flink-connector-dorisdb (the internal implementation is through cache and batch import by stream load), DataX-dorisdb- The writer (DorisWriter plug-in implements the function of writing data to the destination table of DorisDB. In the underlying implementation, DorisWriter imports data to DorisDB in csv or json format through Streamload. Internally, the data read by the reader is cached and then imported into DorisDB in batches. To improve write performance. The overall data flow is source->Reader->DataX channel->Writer->DorisDB.)
数据导出(Export)是DorisDB提供的一种将数据导出并存储到其他介 质上的功能。该功能可以将用户指定的表或分区的数据,以文本的格式,通 过Broker进程导出到远端存储上,如HDFS/阿里云OSS/AWS S3(或者兼 容S3协议的对象存储)等。Data export (Export) is a function provided by DorisDB to export and store data to other media. This function can export the data of the table or partition specified by the user to the remote storage, such as HDFS/Alibaba Cloud OSS/AWS S3 (or object storage compatible with the S3 protocol), through the Broker process in text format.
用户提交一个导出作业后,DorisDB会统计这个作业涉及的所有Tablet, 然后对这些Tablet进行分组,每组生成一个特殊的查询计划。该查询计划 会读取所包含的Tablet上的数据,然后通过Broker将数据写到远端存储指 定的路径中。其处理流程主要包括:1)用户提交一个Export作业到FE。After a user submits an export job, DorisDB will count all the Tablets involved in the job, then group these Tablets, and generate a special query plan for each group. The query plan will read the data on the included Tablet, and then write the data to the path specified by the remote storage through the Broker. The processing flow mainly includes: 1) The user submits an Export job to the FE.
2)FE的导出调度器会通过两阶段来执行一个导出作业:3)PENDING: FE生成一个ExportPendingTask,向BE发送snapshot命令,对所有涉及 到的Tablet做一个快照,并生成多个查询计划。4)EXPORTING:FE生 成一个ExportExportingTask,开始执行一个个的查询计划。2) FE's export scheduler will execute an export job in two stages: 3) PENDING: FE generates an ExportPendingTask, sends a snapshot command to BE, takes a snapshot of all involved Tablets, and generates multiple query plans. 4) EXPORTING: FE generates an ExportExportingTask and starts to execute query plans one by one.
实例二Example 2
请参阅图2,其为本发明一种基于Doris的实时数仓设计实例原理系统图。 它包括数据中台11、Doris数仓12、Flink实时计算集群14(后简称Flink)、 Kafka13、HDFS15.数仓整体以Doris12为核心构建企业级数据仓库,(通过 数据采集系统,多种数据采集手段,包括Mysql binlog解析(Cannal),日 志采集可以通过Flume(Doris审计日志)、埋点接口等实现多种异构数据的 采集,将采集的数据统一通过消息队列(Kafka13)完成高并发的数据吞吐, 同时实现数仓及计算引擎的解耦。Flink14完成数据的ETL处理及实时数据 的统计。数据中台11完成上述事项的管控,包括数据质量管理和数据存储及 数据服务全生命周期的数据管理,其是实现对从数据采集到数据ETL处理, 数据存储及数据服务全生命周期的数据管理,包括元数据,数据质量,数据 规范、数据安全等。HDFS15是实现离线数据的存储,用于将离线数据导入 至Doris11中。数据中台11可以由很多服务器集群或服务器来实现,后续为 了说明方便,简称服务器。Please refer to FIG. 2 , which is a schematic system diagram of a Doris-based real-time data warehouse design example of the present invention. It includes data center 11, Doris data warehouse 12, Flink real-time computing cluster 14 (hereinafter referred to as Flink), Kafka13, HDFS15. The data warehouse as a whole builds an enterprise-level data warehouse with Doris12 as the core, (through the data collection system, a variety of data collection Means, including Mysql binlog analysis (Cannal), log collection can realize the collection of various heterogeneous data through Flume (Doris audit log), buried point interface, etc., and the collected data is unified through the message queue (Kafka13) to complete highly concurrent data Throughput, realize the decoupling of data warehouse and computing engine at the same time. Flink14 completes data ETL processing and real-time data statistics. Data center 11 completes the management and control of the above matters, including data quality management and data storage and data service life cycle data Management, which is to realize the data management of the whole life cycle from data collection to data ETL processing, data storage and data service, including metadata, data quality, data specification, data security, etc. HDFS15 is to realize offline data storage, used for Import offline data into Doris 11. The data center 11 can be implemented by many server clusters or servers, which will be referred to as servers for the convenience of description later.
上述实现实时数仓设计的原理系统有很多种,以上仅是一种举例说明。There are many kinds of above-mentioned principle systems for realizing real-time data warehouse design, and the above is just an example to illustrate.
请参阅图3,一种基于Doris的实时数仓设计方法的流程图。包括:Please refer to Figure 3, a flow chart of a real-time data warehouse design method based on Doris. include:
S110:将业务实时数据导入到Kakfa队列,并放入适配的topic标签中, 提供消费topic标签中数据的至少一消费任务,设置适配的Routine Load任 务监控;S110: import the real-time business data into the Kakfa queue, and put it into the adapted topic label, provide at least one consumption task for consuming the data in the topic label, and set the adapted Routine Load task monitoring;
S120:Routine Load任务监控实时读取用于表明topic标签中消费当前数 据所在位置信息的偏移量;S120: Routine Load task monitoring real-time reads the offset used to indicate the location information of the current data consumption in the topic tag;
S130:若所述偏移量在预设时间内未增长,则重新启动所述Routine Load 任务监控,从所述偏移量继续执行所述消费任务;S130: If the offset does not increase within a preset time, restart the Routine Load task monitoring, and continue to execute the consumption task from the offset;
S140:将消费任务执行后的Stream Load数据流导入到Doris中。S140: Import the Stream Load data stream after the execution of the consumption task into Doris.
本流程是针对业务实时数据且不需要特殊复杂的二次处理的数据实时导 入至Doris。在Kakfa队列设置多个topic标签(如图1所示,包括第一topic 标签131、第二topic标签132、…第Ntopic标签133)。业务实时数据按需 或按类别导入到Kakfa队列,并放入其适配的topic标签中,如果直接消费 topic的Stream Load数据流直接导入到Doris时,会发现数据会被阻塞,经 常挂掉的情况,为此,我司技术人员经过分析获知由于导入的数据量大,事 务数量比较大才尽管如发现积压数据的问题。为此,我司在服务器上设置 RoutineLoad任务监控。提供消费topic标签中数据的消费任务一般适配一 Routine Load任务监控。在本发明中,第一topic标签131、第二topic标签 132、…第Ntopic标签133中,提供消费一topic标签中数据的消费任务可能 有多个,比如第一topic标签131可能有多个消费任务,而且根据需要处理或 统计数据,实时会创建新的消费任务。本发明中,是针对消息任务来创建对 应的Routine Load任务监控,当预先设定的时间(如几秒)内Routine Load任 务监控中的偏移量不发生变化时,则重新启动所述Routine Load任务监控, 根据所述偏移量继续执行所述消费任务,即重新启动执行该消费人任务中对 应的Stream Load数据流导入到Doris中。在本实例中,可以建立Routine Load 任务监控组,每一topic标签131设置一Routine Load任务监控组,当预先 设定的时间内有Load任务监控组若存在多组偏移量不发生变化,可以一并 分别启动各种的Routine Load任务监控。当在预先设定的时间内,该种状换 发生多次时,可以发出需要手动处理的通知,这种设置更提升处理的效率且 提升其安全性。本实例做了一个自定义的Routine Load监控,去实时读取它 的offset,如果发现了offset不再增长之后,就会手动通过监控系统去执 行停掉Routine Load的任务,并重新启动Routine Load,offset用之前的, 这样既能保证数据不丢,也不会重复,同时恢复Routine Load。This process is for real-time business data that does not require special complex secondary processing to be imported into Doris in real time. Multiple topic tags are set in the Kakfa queue (as shown in FIG. 1 , including the
另外,本实例中,最常用的Stream Load可以通过自定义了Flink的 sink,这个操作也比较简单,就直接用http把数据导入到Doris。即,将消 费任务执行后的Stream Load数据进行导入到Doris中进一步包括:In addition, in this example, the most commonly used Stream Load can be customized through the sink of Flink. This operation is relatively simple, and the data is directly imported into Doris using http. That is, importing the Stream Load data after the execution of the consumption task into Doris further includes:
在Flink实时计算集群上创建一个所述Stream Load数据流最终输出位置 到所述Doris中的sink:flink-connector-dorisdb;Create a final output position of the Stream Load data stream on the Flink real-time computing cluster to the sink in the Doris: flink-connector-dorisdb;
Stream Load数据通过缓存并批量直接由Http导入至所述Doris中。Stream Load data is directly imported into the Doris through Http through the cache and in batches.
具体地,要先下载connector的jar包并放入flink的lib/目录下,然后进 入cli后执行如下命令即可创建一个到dorisdb的sink。创建一个 flink-connector-dorisdb:Stream Load数据流最终输出位置到Doris中的sink。 增设exactly-once的数据sink,需要外部系统的two phase commit机制。由 于DorisDB无此机制,申请人需要依赖flink的checkpoint-interval在每次 checkpoint时保存已经缓存的数据组及其label,在之后的数据传入时阻塞 flush所有state中已经缓存的数据,以此达到精准一次。但如果DorisDB挂 掉了,会导致用户的flink sink stream算子长时间阻塞,并引起flink的监控 报警或强制kill。本发明可以默认使用csv格式进行导入。如果遇到还存在导 入停止的情况,本发明还可以增加flink任务的内存来提升工作效率。Specifically, first download the jar package of the connector and put it in the lib/ directory of flink, then enter the cli and execute the following command to create a sink to dorisdb. Create a flink-connector-dorisdb: the final output location of the Stream Load data stream to the sink in Doris. Adding an exact-once data sink requires the two phase commit mechanism of the external system. Since DorisDB does not have this mechanism, the applicant needs to rely on flink's checkpoint-interval to save the cached data group and its label at each checkpoint, and block the cached data in all states from flushing when the subsequent data is incoming, so as to achieve Precise once. However, if DorisDB hangs up, the user's flink sink stream operator will be blocked for a long time, and will cause flink's monitoring alarm or forced kill. The present invention can use the csv format for import by default. If the import still exists, the present invention can also increase the memory of the flink task to improve the work efficiency.
实例三Example three
上述实例二主要是对针对业务实时数据的导入,本实例三可以在此基础 上增设实时数据需要二次业务逻辑处理后的导入Doris。实时导入之前,还有 很多数据或任务需要各种二次处理才能导入,这样情况,本发明采用如下方 案实现:The above example 2 is mainly for importing real-time data for business. In this example 3, the real-time data can be added to Doris after secondary business logic processing. Before real-time import, there are still a lot of data or tasks that need various secondary processing to import, in this case, the present invention adopts the following scheme to realize:
需要二次业务逻辑处理的至少一实时数据先通过Flink的二次业务逻辑 处理,再进行缓存,后将处理完成后Stream Load数据导入Doris。具体来说 还包括:找到需要二次业务逻辑处理的实时数据,先对所述实时数据通过 Flink进行所述二次业务的逻辑处理,再对所述逻辑处理后的数据进行缓存, 最后将缓存的所述数据由Stream Load导入Doris。比如,数据中有大量重复 数据记录,则可以对二次业务逻辑处理包括对该些数据进行查重和去重处理, 后再将查重和去重的数据先进行缓存,再通过Stream Load导入Doris。At least one real-time data that needs secondary business logic processing is first processed by Flink's secondary business logic, then cached, and then the Stream Load data after processing is imported into Doris. Specifically, it also includes: finding the real-time data that needs secondary business logic processing, first performing the logic processing of the secondary business on the real-time data through Flink, then caching the logically processed data, and finally caching the data. The data is imported into Doris by Stream Load. For example, if there are a large number of duplicate data records in the data, the secondary business logic processing can include checking and de-duplication processing of these data, and then cache the checked and de-duplicated data first, and then import it through Stream Load. Doris.
需要二次业务逻辑处理的至少一实时数据先通过Flink的二次业务逻辑 处理进一步包括:At least one real-time data requiring secondary business logic processing is first processed by Flink's secondary business logic and further includes:
至少一所述实时数据的二次业务逻辑设置成一任务管理,所述任务管理 提交适配的任务作业给JobManager,所述JobManager负责所述作业的调度 和资源分配,The secondary business logic of at least one of the real-time data is set to a task management, and the task management submits an adapted task job to the JobManager, and the JobManager is responsible for the scheduling and resource allocation of the job,
所述JobManager将所述作业分给对应的TaskManager,TaskManager收 到所述任务作业后,启动线程去执行,并向JobManager报告所述任务状态和 自身运行状态;Described JobManager assigns described job to corresponding TaskManager, after TaskManager receives described task job, starts thread to carry out, and reports described task state and self-running state to JobManager;
当所述任务执行结束后,所述JobManager将收到通知,并将二次业务逻 辑处理后的数据后发送给对应的任务管理。After described task execution ends, described JobManager will receive notification, and send the data after secondary business logic processing to corresponding task management.
举个例子来说,需要二次业务逻辑处理的至少一实时数据先通过Flink 的二次业务逻辑处理进一步包括:For example, at least one real-time data that needs secondary business logic processing is first processed by Flink's secondary business logic and further includes:
先从ods日志主题消费数据,然后通过用户行为,将所述数据中的行为 数据分为3种:页面日志,曝光日志,启动日志;First consume data from the ods log topic, and then divide the behavior data in the data into three types through user behavior: page log, exposure log, and startup log;
通过定义了一个flink的状态变量为时间,用于过滤新用户的数据:判断 是否是新用户,如果是第一次来的话,其状态就是空,将现在的时间更新进 所述状态,否则状态不为空,则所述当前用户修改为老用户;By defining a state variable of flink as time, it is used to filter the data of new users: to determine whether it is a new user, if it is the first time, its state is empty, and the current time is updated into the state, otherwise the state If it is not empty, the current user is changed to an old user;
对进行过滤处理后的数据进行分流处理,所述启动日志和所述曝光数据 写入侧输出流,所述页面数据写入主流并通过json获取对应的值,在start 中写入启动测输出流,去除所述启动日志全部为页面日志,将所述页面日志 写入到主流,再判断是否是曝光日志,然后写入到曝光侧输出流。Divide the filtered data, write the startup log and the exposure data into the output stream on the side, write the page data into the mainstream and obtain the corresponding value through json, and write the startup test output stream in start , remove all the startup logs as page logs, write the page logs into the main stream, and then determine whether they are exposure logs, and then write them into the exposure side output stream.
实例四Example 4
与实例二或实例三相比,本实例四中包含对离线业务的导入方案的处理。 其进一步包括:在离线业务场景中,利用Broker Load将所述离线业务数据 从HDFS导入所述Doris;通过SQL方式,使用DolphinScheduler配置调度 任务,批处理完成所述离线业务数据的处理,后汇总到一个总表中。Compared with the second example or the third example, the fourth example includes the processing of the introduction scheme of the offline service. It further includes: in the offline business scenario, using Broker Load to import the offline business data from HDFS into the Doris; using SQL mode, using DolphinScheduler to configure scheduling tasks, batch processing the offline business data, and then summarize it into in a summary table.
在DolphinScheduler配置调度任务,即哪些离线业务需要导入配置的调 度任务,即可通过HDFS将对应的数据导至至Doris,并且可以根据需要汇 总至一张总表中。还有,通过下述的命令查询导入的结果Configure scheduling tasks in DolphinScheduler, that is, which offline services need to import the configured scheduling tasks, the corresponding data can be imported to Doris through HDFS, and can be aggregated into a summary table as required. Also, query the imported results with the following command
show load where label="ts_202111221719";show load where label="ts_202111221719";
CANCELLED:导入失败,可查看http链接报错原因CANCELLED: Import failed, you can view the reason for the error reported by the http link
LOADING:正常导入中。LOADING: Loading normally.
FINISHED:导入完成FINISHED: Import complete
通过上述命令对每一调度任务进行跟踪反馈,及时获知导入的结果并进 行。The above commands are used to track and feedback each scheduling task, and the imported results can be obtained and carried out in time.
实例五Example 5
本实例还包括:在实际业务场景中相对固定的数据,直接对接数据服务 大厅,由所述数据服务大厅与Power BI工具和前端报表对接。该步骤的目的 是对于一些相对固定的数据,直接输出到前端报表中。对于Adhoc的数据, 提供BI工具供业务人员查询。PowerBI是一款实力非凡的产品,出自于微 软公司,Power BI是基于Excel的高级功能,在这上面延伸并开发出了多样 化的功能点作为集合,最后造就了这款BI工具。Power BI能就常规的文件 格式数据进行直接导入分析,比如大家熟知的Excel、TXT等等,也可以直 接对传统数据库和多维数据库的对接。不容分说,Power BI作为大厂的手笔, 稳定性和使用的流畅度都是很奈斯的。另外,PowerBI提供了一些可视化界 面的操作选项,结合M语言和DAX函数,可以进行数据加工处理This example also includes: the relatively fixed data in the actual business scenario is directly connected to the data service hall, and the data service hall is connected to the Power BI tool and the front-end report. The purpose of this step is to directly output some relatively fixed data to the front-end report. For Adhoc data, BI tools are provided for business personnel to query. PowerBI is an extraordinary product from Microsoft. Power BI is an advanced function based on Excel. It extends and develops a variety of function points as a collection, and finally creates this BI tool. Power BI can directly import and analyze data in conventional file formats, such as the well-known Excel, TXT, etc., and can also directly connect to traditional databases and multidimensional databases. It cannot be said that Power BI, as the handwriting of a major manufacturer, is very stable and smooth to use. In addition, PowerBI provides some visual interface operation options, combined with M language and DAX functions, data processing can be performed.
实施例六Embodiment 6
如图4所示,基于相同的构思,本发明还一种基于Doris的实时数仓设 计装置,包括:As shown in Figure 4, based on the same concept, the present invention also a real-time data warehouse design device based on Doris, comprising:
Routine Load任务监控设置模块401:用于将业务实时数据导入到Kakfa 队列,并放入适配的topic标签中,后提供消费所述topic标签中数据的至少 一消费任务,设置适配的Routine Load任务监控;Routine Load task monitoring and setting module 401: used to import real-time business data into the Kakfa queue and put it into the adapted topic tag, then provide at least one consumption task for consuming the data in the topic tag, and set the adapted Routine Load task monitoring;
Routine Load任务监控处理模块402:用于所述Routine Load任务监控 实时读取用于表明所述topic标签中消费当前数据所在位置信息的偏移量;Routine Load task monitoring and processing module 402: used for the Routine Load task monitoring to read in real time the offset used to indicate the location information of the current data consumption in the topic tag;
所述Routine Load任务监控启动模块403:用于若所述偏移量在预设时 间内未增长,则重新启动所述Routine Load任务监控,从所述偏移量继续执 行所述消费任务;The Routine Load task monitoring startup module 403: for if the offset does not increase within a preset time, then restart the Routine Load task monitoring, and continue to execute the consumption task from the offset;
导入模块404:用于将所述消费任务执行后的Stream Load数据流导入到 Doris中。Import module 404: used to import the Stream Load data stream after the execution of the consumption task into Doris.
该实时数仓库设计装置可以集成中上述的数据中仓中,也可以单独为一 服务器。The real-time data warehouse design device can be integrated into the above-mentioned data warehouse, or can be a separate server.
实施例七Embodiment 7
如图5所示,基于相同的构思,本发明还提供一种计算机设备600,该 计算机设备600可因配置或性能不同而产生比较大的差异,可以包括一个或 一个以上处理器610(central processing units,CPU)710(例如,一个或一个以 上处理器)和存储器620,一个或一个以上存储应用程序633或数据632的存 储介质630(例如一个或一个以上海量存储设备)。其中,存储器620和存储介 质630可以是短暂存储或持久存储。存储在存储介质630的程序可以包括一 个或一个以上模块(图示没标出),每个模块可以包括对计算机设备600中的 一系列指令操作。更进一步地,处理器610可以设置为与存储介质630通信, 在计算机设备600上执行存储介质630中的一系列指令操作。As shown in FIG. 5, based on the same concept, the present invention also provides a
计算机设备600还可以包括一个或一个以上电源640,一个或一个以上 有线或无线网络接口650,一个或一个以上输入输出接口660,和/或,一个 或一个以上操作系统631,例如Windows Serve,Mac OS X,Unix,Linux, FreeBSD等等。本领域技术人员可以理解,图5示出的计算机设备结构并不 构成对计算机设备的限定,可以包括比图示更多或更少的部件或者组合某些 部件,或者不同的部件布置。
所述计算机可读指令被所述处理器执行时,通过将业务实时数据从 Flume或Canal中导入到Kafka,Kafka中业务实时数据经过多次数据处理, 数据处理后用SQL导入到Doris;在复杂的业务逻辑中,业务实时数据先通 过Flink处理,再利用Redis或PIKA缓存,处理完成既可以用Stream Load 方法导入Doris;在离线业务场景中,业务数据利用BrokerLoad将业务数据 从HDFS导入Doris,再通过SQL方式,使用DolphinScheduler跑批处理复杂业务数据,最后汇总到一个总表中;在实际业务场景中,直接对接数据服 务大厅,由数据服务大厅对接Power BI工具和前端报表。多表的票单量精确 去重,并保证数据的准确性;支持多种报表的导出和多种数据源的导入;数 据无积压,保障在数据量高峰访问时平稳运行。When the computer-readable instruction is executed by the processor, by importing the business real-time data from Flume or Canal into Kafka, the business real-time data in Kafka is processed multiple times, and the data is processed and imported into Doris with SQL; In the business logic, the real-time business data is first processed by Flink, and then cached by Redis or PIKA. After the processing is completed, the Stream Load method can be used to import it into Doris; in the offline business scenario, the business data is imported from HDFS to Doris by BrokerLoad, and then Through SQL, use DolphinScheduler to run batch processing of complex business data, and finally summarize it into a summary table; in actual business scenarios, it is directly connected to the data service hall, which is connected to Power BI tools and front-end reports. Accurately deduplicate the amount of tickets with multiple tables, and ensure the accuracy of the data; support the export of various reports and the import of various data sources; no data backlog, ensuring smooth operation during peak data volume access.
在一个实施例中,提出了一种可读存储介质,所述计算机可读指令被一 个或多个处理器执行时,使得一个或多个处理器执行上述一种基于Doris的 实时数仓设计方法,具体步骤在此不再赘述。In one embodiment, a readable storage medium is provided. When the computer-readable instructions are executed by one or more processors, the one or more processors execute the above-mentioned method for designing a real-time data warehouse based on Doris , the specific steps are not repeated here.
所属领域的技术人员可以清楚地了解到,为描述的方便和简洁,上述描 述的系统,装置和单元的具体工作过程,可以参考前述方法实施例中的对应 过程,在此不再赘述。Those skilled in the art can clearly understand that, for the convenience and brevity of description, the specific working process of the above-described system, device and unit can refer to the corresponding process in the foregoing method embodiments, which will not be repeated here.
所述集成的单元如果以软件功能单元的形式实现并作为独立的产品销售 或使用时,可以存储在一个计算机可读取存储介质中。基于这样的理解,本 发明的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的 全部或部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个 存储介质中,包括若干指令用以使得一台计算机设备(可以是个人计算机,服 务器,或者网络设备等)执行本发明各个实施例所述方法的全部或部分步骤。 而前述的存储介质包括:U盘、移动硬盘、只读存储器(read-onlymemory, ROM)、随机存取存储器(random access memory,RAM)、磁碟或者光盘等各 种可以存储程序代码的介质。The integrated unit, if implemented in the form of a software functional unit and sold or used as a stand-alone product, may be stored in a computer-readable storage medium. Based on this understanding, the technical solution of the present invention is essentially or the part that contributes to the prior art, or all or part of the technical solution can be embodied in the form of a software product, and the computer software product is stored in a storage medium , including several instructions for causing a computer device (which may be a personal computer, a server, or a network device, etc.) to execute all or part of the steps of the methods described in the various embodiments of the present invention. The aforementioned storage medium includes: a U disk, a removable hard disk, a read-only memory (ROM), a random access memory (RAM), a magnetic disk or an optical disk and other media that can store program codes.
以上所述,以上实施例仅用以说明本发明的技术方案,而非对其限制; 尽管参照前述实施例对本发明进行了详细的说明,本领域的普通技术人员应 当理解:其依然可以对前述各实施例所记载的技术方案进行修改,或者对其 中部分技术特征进行等同替换;而这些修改或者替换,并不使相应技术方案 的本质脱离本发明各实施例技术方案的精神和范围。As mentioned above, the above embodiments are only used to illustrate the technical solutions of the present invention, but not to limit them; although the present invention has been described in detail with reference to the foregoing embodiments, those of ordinary skill in the art should understand: The technical solutions described in the embodiments are modified, or some technical features thereof are equivalently replaced; and these modifications or replacements do not make the essence of the corresponding technical solutions depart from the spirit and scope of the technical solutions of the embodiments of the present invention.
Claims (10)
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202210325028.7A CN114942916B (en) | 2022-03-30 | 2022-03-30 | Real-time data warehouse design method, device, equipment and storage medium based on Doris |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202210325028.7A CN114942916B (en) | 2022-03-30 | 2022-03-30 | Real-time data warehouse design method, device, equipment and storage medium based on Doris |
Publications (2)
| Publication Number | Publication Date |
|---|---|
| CN114942916A true CN114942916A (en) | 2022-08-26 |
| CN114942916B CN114942916B (en) | 2024-12-06 |
Family
ID=82906903
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202210325028.7A Active CN114942916B (en) | 2022-03-30 | 2022-03-30 | Real-time data warehouse design method, device, equipment and storage medium based on Doris |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN114942916B (en) |
Cited By (1)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN120386780A (en) * | 2025-06-27 | 2025-07-29 | 山东捷瑞数字科技股份有限公司 | A Doris method for storing device data based on industrial Internet |
Citations (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN111741360A (en) * | 2020-06-19 | 2020-10-02 | 深圳市酷开网络科技有限公司 | Image application method and device based on open-source column type database and storage medium |
| WO2020243312A1 (en) * | 2019-05-28 | 2020-12-03 | Apple Inc. | Soft resource signaling in integrated access and backhaul systems |
| CN113987086A (en) * | 2021-10-26 | 2022-01-28 | 北京百度网讯科技有限公司 | Data processing method, data processing device, electronic device, and storage medium |
-
2022
- 2022-03-30 CN CN202210325028.7A patent/CN114942916B/en active Active
Patent Citations (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| WO2020243312A1 (en) * | 2019-05-28 | 2020-12-03 | Apple Inc. | Soft resource signaling in integrated access and backhaul systems |
| CN111741360A (en) * | 2020-06-19 | 2020-10-02 | 深圳市酷开网络科技有限公司 | Image application method and device based on open-source column type database and storage medium |
| CN113987086A (en) * | 2021-10-26 | 2022-01-28 | 北京百度网讯科技有限公司 | Data processing method, data processing device, electronic device, and storage medium |
Non-Patent Citations (3)
| Title |
|---|
| JACQUELINE LLANOS等: "A Novel Distributed Control Strategy for Optimal Dispatch of Isolated Microgrids Considering Congestion", IEEE TRANSACTIONS ON SMART GRID, vol. 10, no. 6, 28 March 2019 (2019-03-28), pages 6595 - 6607 * |
| 只是甲: "Doris系列13-数据导入之Routine Load", pages 1 - 5, Retrieved from the Internet <URL:https://blog.csdn.net/u010520724/article/details/122425776> * |
| 吴莹: "基于银行数据仓库系统关键技术的研究与实现", 中国优秀硕士学位论文全文数据库信息科技辑, no. 12, 15 December 2010 (2010-12-15), pages 138 - 139 * |
Cited By (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN120386780A (en) * | 2025-06-27 | 2025-07-29 | 山东捷瑞数字科技股份有限公司 | A Doris method for storing device data based on industrial Internet |
| CN120386780B (en) * | 2025-06-27 | 2025-09-26 | 山东捷瑞数字科技股份有限公司 | Device data storage Doris method based on industrial Internet |
Also Published As
| Publication number | Publication date |
|---|---|
| CN114942916B (en) | 2024-12-06 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| US8938421B2 (en) | Method and a system for synchronizing data | |
| US20200301947A1 (en) | System and method to improve data synchronization and integration of heterogeneous databases distributed across enterprise and cloud using bi-directional transactional bus of asynchronous change data system | |
| US10621049B1 (en) | Consistent backups based on local node clock | |
| US11669427B2 (en) | Query-attempt processing in a database environment | |
| US20190311057A1 (en) | Order-independent multi-record hash generation and data filtering | |
| US20230018975A1 (en) | Monolith database to distributed database transformation | |
| WO2020228534A1 (en) | Micro-service component-based database system and related method | |
| CN103631870A (en) | System and method used for large-scale distributed data processing | |
| US11640347B2 (en) | Automated query retry execution in a database system | |
| CN110569142A (en) | A system and method for incremental synchronization of ORACLE data | |
| US12314247B2 (en) | Identifying software regressions based on query retry attempts in a database environment | |
| CN119003652A (en) | Method, device, medium and equipment for synchronizing data among multi-source databases | |
| CN116150263B (en) | Distributed graph calculation engine | |
| CN114942916A (en) | Doris-based real-time data bin design method, device, equipment and storage medium | |
| CN117076426A (en) | Traffic intelligent engine system construction method and device based on flow batch integration | |
| US20220318314A1 (en) | System and method of performing a query processing in a database system using distributed in-memory technique | |
| CN115718690A (en) | Data accuracy monitoring system and method | |
| CN118035270A (en) | Data query method, device, software program, equipment and storage medium | |
| Rothsberg | Evaluation of using NoSQL databases in an event sourcing system | |
| CN118550972A (en) | A micro-batch data collection and processing method, device and readable storage medium | |
| CN116932619A (en) | Massive data processing and data access method for ultra-large scale NFV network resource pool | |
| CN116860732A (en) | A data life cycle control method, device, electronic equipment and media | |
| CN116049242A (en) | A system that supports full-link log collection, storage, and query within an enterprise | |
| Al-Saeedi | Factors influencing the database selection for B2C web applications | |
| Zhao et al. | An Overview of the NoSQL World. |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant | ||
| GR01 | Patent grant |