CN116010452A - Industrial data processing system and method based on stream type calculation engine and medium - Google Patents
Industrial data processing system and method based on stream type calculation engine and medium Download PDFInfo
- Publication number
- CN116010452A CN116010452A CN202111226614.8A CN202111226614A CN116010452A CN 116010452 A CN116010452 A CN 116010452A CN 202111226614 A CN202111226614 A CN 202111226614A CN 116010452 A CN116010452 A CN 116010452A
- Authority
- CN
- China
- Prior art keywords
- data
- stream
- computing engine
- processing
- sql
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000012545 processing Methods 0.000 title claims abstract description 83
- 238000000034 method Methods 0.000 title claims description 45
- 238000004364 calculation method Methods 0.000 title description 13
- 238000003860 storage Methods 0.000 claims abstract description 32
- 238000001914 filtration Methods 0.000 claims abstract description 27
- 238000011161 development Methods 0.000 claims abstract description 25
- 238000007781 pre-processing Methods 0.000 claims abstract description 10
- 239000002699 waste material Substances 0.000 claims abstract description 9
- 239000008280 blood Substances 0.000 claims abstract description 8
- 210000004369 blood Anatomy 0.000 claims abstract description 8
- 230000008569 process Effects 0.000 claims description 29
- 230000002159 abnormal effect Effects 0.000 claims description 27
- 230000006870 function Effects 0.000 claims description 23
- 238000004422 calculation algorithm Methods 0.000 claims description 12
- 238000012216 screening Methods 0.000 claims description 9
- 238000004140 cleaning Methods 0.000 claims description 7
- 238000005192 partition Methods 0.000 claims description 7
- 238000003672 processing method Methods 0.000 claims description 6
- 238000011144 upstream manufacturing Methods 0.000 claims description 6
- 238000011045 prefiltration Methods 0.000 claims description 4
- 238000004590 computer program Methods 0.000 claims description 3
- 239000013589 supplement Substances 0.000 claims description 3
- 238000004458 analytical method Methods 0.000 abstract description 18
- 238000005065 mining Methods 0.000 abstract description 7
- 238000012544 monitoring process Methods 0.000 description 24
- 238000005457 optimization Methods 0.000 description 11
- 238000007726 management method Methods 0.000 description 10
- 238000012549 training Methods 0.000 description 10
- 230000005540 biological transmission Effects 0.000 description 8
- 238000001514 detection method Methods 0.000 description 7
- 230000007246 mechanism Effects 0.000 description 7
- 230000008901 benefit Effects 0.000 description 6
- 238000013468 resource allocation Methods 0.000 description 6
- 238000004519 manufacturing process Methods 0.000 description 5
- 238000012790 confirmation Methods 0.000 description 4
- 238000010276 construction Methods 0.000 description 4
- 238000011156 evaluation Methods 0.000 description 4
- 230000010354 integration Effects 0.000 description 4
- 238000012360 testing method Methods 0.000 description 4
- 238000005303 weighing Methods 0.000 description 4
- 230000002776 aggregation Effects 0.000 description 3
- 238000004220 aggregation Methods 0.000 description 3
- 230000006399 behavior Effects 0.000 description 3
- 238000013480 data collection Methods 0.000 description 3
- 230000007547 defect Effects 0.000 description 3
- 238000005516 engineering process Methods 0.000 description 3
- 238000000605 extraction Methods 0.000 description 3
- 230000001965 increasing effect Effects 0.000 description 3
- 230000002085 persistent effect Effects 0.000 description 3
- 238000011084 recovery Methods 0.000 description 3
- 238000012384 transportation and delivery Methods 0.000 description 3
- 238000009825 accumulation Methods 0.000 description 2
- 230000009471 action Effects 0.000 description 2
- 239000003795 chemical substances by application Substances 0.000 description 2
- 238000004891 communication Methods 0.000 description 2
- 230000000295 complement effect Effects 0.000 description 2
- 230000006835 compression Effects 0.000 description 2
- 238000007906 compression Methods 0.000 description 2
- 238000003745 diagnosis Methods 0.000 description 2
- 230000000694 effects Effects 0.000 description 2
- 238000009776 industrial production Methods 0.000 description 2
- 238000003780 insertion Methods 0.000 description 2
- 230000037431 insertion Effects 0.000 description 2
- 238000010801 machine learning Methods 0.000 description 2
- 238000012423 maintenance Methods 0.000 description 2
- 238000012986 modification Methods 0.000 description 2
- 230000004048 modification Effects 0.000 description 2
- 238000013433 optimization analysis Methods 0.000 description 2
- 238000012795 verification Methods 0.000 description 2
- 238000012800 visualization Methods 0.000 description 2
- 238000012952 Resampling Methods 0.000 description 1
- 230000001133 acceleration Effects 0.000 description 1
- 230000004931 aggregating effect Effects 0.000 description 1
- 238000012098 association analyses Methods 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 230000008859 change Effects 0.000 description 1
- 230000008878 coupling Effects 0.000 description 1
- 238000010168 coupling process Methods 0.000 description 1
- 238000005859 coupling reaction Methods 0.000 description 1
- 238000013500 data storage Methods 0.000 description 1
- 230000009849 deactivation Effects 0.000 description 1
- 230000001934 delay Effects 0.000 description 1
- 230000001419 dependent effect Effects 0.000 description 1
- 238000013461 design Methods 0.000 description 1
- 238000010586 diagram Methods 0.000 description 1
- 238000009826 distribution Methods 0.000 description 1
- 230000003631 expected effect Effects 0.000 description 1
- 238000001125 extrusion Methods 0.000 description 1
- 230000006872 improvement Effects 0.000 description 1
- 230000001939 inductive effect Effects 0.000 description 1
- 230000010365 information processing Effects 0.000 description 1
- 238000007689 inspection Methods 0.000 description 1
- 230000002452 interceptive effect Effects 0.000 description 1
- 238000013138 pruning Methods 0.000 description 1
- 238000007670 refining Methods 0.000 description 1
- 230000001172 regenerating effect Effects 0.000 description 1
- 230000001105 regulatory effect Effects 0.000 description 1
- 230000008439 repair process Effects 0.000 description 1
- 238000012552 review Methods 0.000 description 1
- 238000010187 selection method Methods 0.000 description 1
- 230000035945 sensitivity Effects 0.000 description 1
- 230000001502 supplementing effect Effects 0.000 description 1
Images
Classifications
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
The invention provides a data processing system based on a stream computing engine, which comprises a data acquisition module: after acquiring original data from a terminal, synchronously compiling coding and decoding protocols of a transmitting end and a receiving end, and matching corresponding table information from an unbounded data stream to finish the acquisition of information events; front-end filter module: filtering the light weight data provided by the collected and screened information through a bloom filter, and synchronously filtering mass data provided by K-V storage; and a data preprocessing module: the stream computing engine provides stream batch unified stream processing, avoids resource waste generated by offline and real-time two sets of service development, and abstracts data stream by providing SQL support; data blood margin module: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a platform to analyze and start the operation. The invention realizes the mining of the data value and the processing analysis of unstructured data.
Description
Technical Field
The invention relates to the technical field of data processing, in particular to a data processing system, a data processing method and a data processing medium based on a stream computing engine.
Background
The industrial Internet is a brand new industrial ecology, key infrastructure and novel application mode which are formed by deeply fusing a new generation of information communication technology and industrial economy, takes a network as a base, takes a platform as a center, takes data as an element and takes safety as a guarantee, and improves the efficiency of industrial production by collecting and mining the data, applying the technology to the ground and intelligently modifying the flow.
With the advancement of informatization and intelligent reformation of industrial production, the data volume generated in the production process rises exponentially. The complex data are distributed in sensor equipment, internet of things equipment, electronic commerce, enterprise communication tools and the like, and have the characteristics of complex sources, unstructured, non-distributed and the like, so that challenges are provided for mining of data value and processing analysis of unstructured data.
The data source diversification can put higher requirements on the connection expansion capacity of the data integration system, and the distributed big data storage system releases the pressure of the service while bringing stronger capacity to the service, so that the acceleration expansion of the data volume is promoted. The rapid growth in data magnitude requires higher demands on both throughput and real-time performance of the data integration platform. Of course, as a data-dependent basic system, data accuracy is the basic requirement.
In addition, the system needs to understand and integrate the data scattered on different business teams, manage and ensure the safety of data access, and the whole process is relatively complex. While the platformization can automate complex processes, the high costs inherent in data integration work are not completely eliminated in a platformized manner. The greatest possible increase in the reusability and controllability of the process is therefore also a challenge that data integration systems need to continuously cope with.
Meanwhile, streaming data is now confronted with the need to query the persistent database, and the problem that stress is generated on the underlying database is still unresolved.
Through retrieval, patent document CN111475682a discloses an intelligent operation and maintenance platform based on a super-large-scale data system, which adopts an open and expandable data acquisition bottom architecture, is used for butting multiple types of data sources, and reduces a big data acquisition threshold. And viewing all kinds of data in the system from the full view, tracking the whole life cycle of each data node, and sharing and showing the link relation among each data node. And if the log monitoring is performed through the system node, the blood-edge relationship of the data is managed and managed through the link monitoring of multiple dimensions such as users, data, job tasks, API, services and the like, the traceability of fine granularity data and operation is ensured, and the full life cycle management of the data is completed. The prior art can perform contrast verification on data and original data by highlighting, so that the integrity and auditability of the data are ensured, but the mining of the data value and the processing analysis of unstructured data cannot be solved.
Patent document CN109637090a discloses a disaster monitoring and early warning platform construction method based on an SOA architecture, which solves the defects that the current geological disaster monitoring and early warning is performed mostly through manual inspection and monitoring equipment threshold alarming, but due to the diversity and uncertainty of factors inducing geological disasters, complicated geographical environment factors are commonly inserted and blended, and the management and evaluation of data become extremely difficult. The invention fully utilizes the low coupling characteristic of the SOA architecture to better combine the AI early warning service with the monitoring system, simultaneously utilizes the advantages of processing complex data and accurately classifying by a machine learning algorithm, monitors the data of each monitoring object in real time on the basis of the risk evaluation of a risk source and the evaluation of the monitoring object, predicts the grade of a series of geological disasters with different intensities which possibly occur in a certain period in a dangerous area, proposes various countermeasures for reducing risks according to the characteristics of different dangerous areas, and provides auxiliary decisions for geological disaster monitoring and early warning. However, the prior art still cannot process complex events, and real-time warning cannot be achieved for modes which accord with certain characteristics in streaming data and trigger corresponding follow-up actions.
Therefore, there is a need to develop a system and method for solving the problem encountered in the industrial intelligent improvement of the flow computing business in the ground.
Disclosure of Invention
Aiming at the defects in the prior art, the invention aims to provide a data processing system, a data processing method and a data processing medium based on a stream computing engine, so that the mining of data value and the processing analysis of unstructured data are realized, and the reusability and the controllability of a flow are improved.
According to the present invention, there is provided a data processing system based on a stream computing engine, comprising:
and a data acquisition module: after acquiring original data from a terminal, synchronously compiling coding and decoding protocols of a transmitting end and a receiving end, matching corresponding table information from an unbounded data stream to finish the acquisition of information events, and cleaning and screening service field information;
front-end filter module: filtering the light weight data provided by the collected and screened information through a bloom filter, and synchronously filtering the heavy weight data provided by Key-Value storage;
and a data preprocessing module: the streaming computing engine provides a unified information processing path of the capability of unifying streaming batches, avoids resource waste generated by offline and real-time two sets of service development, and abstracts the data flow by providing SQL support;
Data blood margin module: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a computing platform for analyzing and starting the operation.
Preferably, when the information event is collected in the data collection module, the log file is in a mixed format during writing, records in a row format and a status format are required to be respectively analyzed according to the record storage format, the log file is updated into the log file after the stream computing engine submits the transaction to refresh to the disk, and the abnormal restarting condition of the operation is required to be responded by the consumption point of the log.
Preferably, after the data acquisition module acquires the unbounded data stream, the unbounded data stream is sent to a consumption queue, peak clipping and valley filling of data are completed by high throughput processing provided by the message queue, and multi-partition setting is adopted for the message subject of the message queue.
Preferably, the data acquisition module provides storage support including HDFS and Kafka in the management and acquisition stage of data, and simultaneously supports data stored in the multi-parallelism consumption message queue, so as to ensure that the consumption speed of the data link does not form a short board at the entrance.
Preferably, the pre-filtering module uses a hash function built in the bloom filter to de-duplicate the data;
-merging the message records holding the same Key using Key-Value store;
the hash algorithm is used to convert the key into an integer for storage, which takes up a maximum of 8 bytes.
Preferably, the pre-filtering module filters illegal data by a user-defined service threshold or a filter operator for compiling a user-defined filtering rule for the data flow after the duplication removal processing.
Preferably, the data blood-source module defines the metadata of the data specified by the database mode definition language during development, and then writes the data specified by the data manipulation language, defines the real-time data flow as a real-time table to be stored in the metadata storage component, and a user directly selects a required data source from the real-time table, and then can complete the job development only by writing sentences of the data manipulation language.
Preferably, the data blood-source module supplements the converted database mode definition language statement according to the upstream and downstream databases when the task is submitted, forms a complete Streaming SQL script for submitting, and communicates development flows of batch processing service and stream processing service.
The data processing method based on the stream computing engine provided by the invention comprises the following steps:
And a data acquisition step: after acquiring original data from a terminal, synchronously compiling coding and decoding protocols of a transmitting end and a receiving end, matching corresponding table information from an unbounded data stream to finish the acquisition of information events, and cleaning and screening service field information;
the pre-filtration step: filtering the light weight data provided by the collected and screened information through a bloom filter, and synchronously filtering the heavy weight data provided by Key-Value storage;
a data preprocessing step: the streaming computing engine provides a unified message processing path with integrated streaming batch capability, avoids resource waste generated by offline and real-time two sets of service development, and abstracts the data flow by providing SQL support;
data blood-edge step: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a computing platform for analyzing and starting the operation.
According to the present invention there is provided a computer readable storage medium storing a computer program which when executed by a processor performs the steps of the method described above.
Compared with the prior art, the invention has the following beneficial effects:
1. According to the invention, through compression and transmission of the original data, definition mining among data indexes, preprocessing of aggregation calculation using data, standard management of data blood edges and the like, mining of data value and processing analysis of unstructured data are realized.
2. According to the invention, after the original data is acquired, the coding and decoding protocols of the transmitting end and the receiving end are synchronously compiled, the reliability and the transmission efficiency of data transmission are considered, and meanwhile, the data uploaded periodically are compressed to improve the data transmission efficiency.
3. The invention realizes the function of extracting the service field information carried in the log message header after cleaning and screening a plurality of internal service fields contained in the log in the production process, thereby providing the extraction and analysis capability of the service log.
4. The invention realizes the de-duplication of data through the hash function built in the bloom filter, so that the data does not need to be stored during operation and only is represented by bits, thus the space occupation has great advantage compared with the traditional mode, and the data can be kept secret; the execution efficiency of the deduplication algorithm is high, and the time complexity of insertion and query is O (k); the hash functions are mutually independent, and can be calculated in parallel at the hardware instruction level if necessary, so that the processing efficiency is improved.
5. The invention can support the changed data structure by tracing the blood relationship of the data schema when the computing node possibly generates the change of the data structure through the dimension table association and the multi-stream convergence.
Drawings
Other features, objects and advantages of the present invention will become more apparent upon reading of the detailed description of non-limiting embodiments, given with reference to the accompanying drawings in which:
FIG. 1 is a flow diagram of the overall framework of a data processing system based on a streaming computing engine in accordance with the present invention;
FIG. 2 is a flow chart of steps of a data processing method based on a stream computing engine according to the present invention;
FIG. 3 is a task scheduling flow chart of the industrial data monitoring system based on the stream computing engine in the present invention.
Detailed Description
The present invention will be described in detail with reference to specific examples. The following examples will assist those skilled in the art in further understanding the present invention, but are not intended to limit the invention in any way. It should be noted that variations and modifications could be made by those skilled in the art without departing from the inventive concept. These are all within the scope of the present invention.
As shown in FIG. 1, the present invention provides a data processing system based on a streaming computing engine, comprising:
And a data acquisition module: after the original data is acquired from the terminal, coding and decoding protocols of the transmitting end and the receiving end are synchronously compiled, the acquisition of the information event is completed by matching corresponding table information in the unbounded data stream, and meanwhile, the service field information is cleaned and screened. Specifically:
the collection and extraction of data in a message is complex, including many difficulties such as how to process sensor signal data, how to parse database binary log files, and how to filter the service logs in the production process.
When sensor signal data is processed, after original data is acquired from terminal equipment, coding and decoding protocols of a transmitting end and a receiving end are required to be synchronously compiled, reliability and transmission efficiency of data transmission are considered, and meanwhile, periodically uploaded data are compressed to improve data transmission efficiency.
When the binary log file of the database is analyzed, corresponding table information is matched from the unbounded data stream firstly, so that information event acquisition is completed. When the log file is written in a mixed format, records in a row format and a statement format are needed to be analyzed according to the record storage format, the log file is updated into the log file after the stream computing engine submits transaction refreshing to a disk, and the condition that the consumption point of the log is very restarted is needed to be recorded.
When the service logs in the production process are screened, the logs in the production process contain a plurality of internal service fields, and the cleaning and screening are required, so that the function of extracting service field information carried in the message header of the logs is realized, and the extraction and analysis capability of the service logs is provided.
The device diagnosis data related in the above scenario is collected by the MEMS sensor, and analog detection values of humidity, temperature, pressure and the like are converted into digital signals to be transmitted to the diagnosis system, and binary data in the sensor register needs to be reconstructed into data effective for analysis operation in the data processing part, which specifically includes:
1. and restoring measured value data through a collection calculation formula.
2. And eliminating zero offset errors of the acquired data in the measured value by recording the initial value acquired by the sensor.
3. And the reasonable cut-off frequency of the filter is selected, so that out-of-band noise is reduced, and errors caused by random noise signals are reduced.
After being collected or pushed to the system, the unbounded information flow is firstly sent to a consumption queue, and peak clipping and valley filling of the data are completed by means of high throughput capacity provided by the information queue, so that the pressure of the processing system is not increased suddenly due to conduction on sensor data with the same collection interval. And the concurrency efficiency is improved by adopting multi-partition setting for the message theme of the message queue.
In the management and acquisition stage of data, a very rich connector assembly is provided, and the connector assembly comprises a plurality of storage supports such as HDFS, kafka and the like, and simultaneously supports data stored in a consumption message queue with multiple parallelism degrees, so that the consumption speed of a data link is ensured not to form a short board at an entrance.
The present invention extends the database interface to a certain extent, and transmits the data information of all accessed data sources to the downstream through the database.
Front-end filter module: filtering and weighing the collected and screened information through lightweight data provided by a bloom filter, and synchronously filtering and weighing the mass data provided by Key-Value storage. Specifically:
the upstream flow often causes data repetition in links such as data acquisition, message delivery, format analysis and the like, the data repetition can influence downstream application such as data monitoring, besides the traditional use of statistics such as UV and the like, the significance of deduplication is more in eliminating dirty data generated by unreliable data sources, namely the influence of repeated reporting of data or repeated delivery of data, so that the result generated by stream calculation is more accurate.
The invention realizes the lightweight data filtering provided by the bloom filter, and when an upstream data source can only guarantee at least once delivering semantics, downstream statistical data is higher.
The bloom filter realizes the de-duplication of data through a built-in hash function, and has the advantages that:
1. the operation does not need to store data and only uses bits to represent, so the space occupation has great advantage compared with the traditional mode and can secret the data.
2. The execution efficiency of the deduplication algorithm is high, and the time complexity of insertion and query is O (k).
3. The hash functions are mutually independent, and can be calculated in parallel at the hardware instruction level if necessary, so that the processing efficiency is improved.
However, bloom filters do not guarantee complete accuracy of the filtering, such as where a 100% accuracy is not applicable.
For scenes needing accurate deduplication, the invention realizes a deduplication mode for providing mass data through Key-Value storage characteristics. Message records holding the same Key are consolidated by Key-Value storage features such as the RocksDB state backend provided by the Flink engine. The method is suitable for the conditions of high requirements on service data and intolerance of errors. While providing accurate filtering capability, it is necessary to perform fine state management on the job performing the filtering process, and by setting timeout time and configuring incremental checkpoints, unrestricted state growth is avoided.
Considering the problem that state expansion occurs when the key occupation space of data is large, a hash algorithm is adopted to convert the key into integer and then store the integer, so that the key occupies 8 bytes at most. However, since the hash algorithm cannot ensure that no conflict is generated, whether the starting is required to be performed or not needs to be determined according to the service scene. And filtering illegal data by the reprocessed data flow through a filter operator for realizing a self-defined service threshold or compiling a self-defined filtering rule, and reducing the downstream calculation pressure. And meanwhile, unpacking the original data is completed, and the original data is packed into a data format required to be used according to service requirements. Finally, the data flow record entering the flow type computing system is marked with a time stamp, so that the subsequent processing is convenient.
And a data preprocessing module: the streaming computing engine provides a unified flow batch capability unified message processing path, avoids resource waste generated by offline and real-time two sets of service development, and abstracts the data flow by providing SQL support. Specifically:
in the data preprocessing and feature engineering stage, the streaming computing engine aims at providing the unified computing capacity of the streaming batch, avoiding the resource waste generated by offline and real-time two sets of business development, abstracting the data flow by providing SQL support, and improving the efficiency of the job development. Based on the characteristics, compared with the traditional scheme, the method and the device can unify development logics of two links in real time and off-line by means of the unified calculation engine so as to avoid the phenomenon of inconsistent calculation semantics caused by inconsistent data processing links, and therefore downstream data do not need to be repeatedly cleaned and filtered.
The filtered industrial data also needs to be effectively tailored in the data preprocessing stage:
1. and consuming and acquiring the data packet sent by the collector from the message queue, and analyzing according to a transmission protocol.
2. Splitting the data stream through the collector and the device identifier, and then storing the data of the same identifier in the same data stream.
3. And combining data according to the preset data processing group, and combining the data subjected to association analysis into the same group.
4. And aggregating the data according to the time window, and regenerating the aggregated data into an unbounded stream through sliding of the window.
5. Based on simple threshold logic, the data is cleaned, and data which does not meet the requirement or data which is obviously abnormal is recorded into an abnormal branch flow for system optimization analysis.
6. And recording the effective data duty ratio and the processing rule hit number in the data processing process for the system optimization analysis.
Data blood margin module: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a computing platform for analyzing and starting the operation. Specifically:
the SQL abstract capability provided by the stream computing engine can be used for rapidly drawing the processing process of data in the task operation. Therefore, the main means for submitting the construction flow calculation task in the system is to write the flow of script description data by adopting SQL language, and then submit the flow to the platform to analyze and start the operation.
Usually, when developing, the metadata of the DDL (database schema definition language) specified data needs to be defined first, and then the DML (database operation language) specified data processing process is rewritten, and DDL (database schema definition language) definition sentences in most tasks are repeated but do not obtain benefits in job development. Metadata information is also stored in the meta store component store of the persistent store, and can also be retrieved in the upstream and downstream databases. Therefore, the real-time data flow is defined as a real-time table to be stored in the metastore component, a user can directly select a required data source from the real-time table, and then the job development can be completed only by writing a DML statement. When the task is submitted, the system supplements the converted DDL statement according to upstream and downstream category to form a complete streamingSQL script for submission. The development difficulty of the operation can be further reduced, and the development flow of the batch processing service and the stream processing service can be further opened, so that the development steps of the batch processing service and the stream processing service are more similar.
In addition, the rules engine of the solution supports the continual upgrading of configuration rules. Metadata hosted at the metastore can provide some compatibility in the event of inconsistent data schemas introduced by different data nodes. In the data processing process, the preprocessed data is generally written into a message queue, then online training is carried out, a dynamic model is continuously generated in the training process, and then the model is pushed to an online reasoning module for reasoning. Online machine learning is characterized by dynamic updating, continuous training and continuous verification of models. Meanwhile, more complicated strategies such as model monitoring, model deployment, model rollback and the like are needed. Changes to the data structure may be caused by multi-stream aggregation via dimension table association at the computing node, and support can be provided for the changed data structure by tracing back the blood-edge relationship of the data schema.
As shown in fig. 2, the present invention further provides a data processing method based on a stream computing engine, which includes the following steps:
and a data acquisition step: after the original data is acquired from the terminal, coding and decoding protocols of the transmitting end and the receiving end are synchronously compiled, the acquisition of the information event is completed by matching corresponding table information in the unbounded data stream, and meanwhile, the service field information is cleaned and screened.
The pre-filtration step: filtering and weighing the collected and screened information through lightweight data provided by a bloom filter, and synchronously filtering and weighing the mass data provided by Key-Value storage.
A data preprocessing step: the streaming computing engine provides a unified flow batch capability unified message processing path, avoids resource waste generated by offline and real-time two sets of service development, and abstracts the data flow by providing SQL support.
Data blood-edge step: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a computing platform for analyzing and starting the operation.
The invention further provides a computer readable storage medium storing a computer program which, when executed by a processor, performs the steps of the method described above.
The method for constructing the task is mainly SQL, and an editor of the StreamingSQL is developed, so that the support of the SQL specification defined by the system is provided, and StromSQL, sparkSQL, KSQL, hiveSQL and FlinkSQL grammars are compatible. In addition to providing support for engine dialect SQL, SQL templates are provided. And a user of the platform can conveniently and rapidly develop SQL tasks of a preset scene based on the SQL template. In addition, web-type guide pages are provided for business personnel, so that the business personnel can be helped to construct streaming processing tasks in a form filling mode without writing codes.
In addition, the editor also provides a component library of the custom function, which contains rich built-in functions including time functions, set functions, json processing functions and character string functions. The rich built-in functions can facilitate the development of users and save the development time of the users themselves. The component library also contains a use instruction written by a developer, a use case and the like, and is used for the platform user to search and review. The component library also provides a cloud platform for a system user to upload and download the custom functions and custom operators accumulated in the service, thereby facilitating data sharing and subsequent development support in different system deployment.
The editor also realizes grammar detection and intelligent prompt for the streamingSQL, and a user can make real-time grammar detection and give prompt to a certain extent in the process of writing sentences, and can also provide the completion function of metadata for tables, fields and the like involved in the sentences, thereby optimizing development experience.
The editor supports the functionality of online debugging at the same time, which is very important for streaming computing. The situation that the tasks need to be online and then adjusted according to the observed results is avoided, and the development cost is saved. The system can accept text file as data source to check whether the output accords with the expectations, and can sample the message theme in the message queue or the data in the database table to carry out service logic check.
In the SQL parsing process, the most complex is the widening of the data table by dimension table association. In the invention, table association is performed through thermal storage. After Data is imported from a Data source, the system accesses the back end by using an Async I/O technology, and the system back end accesses the storage of the back end by using a Data Access interface. The system back-end storage support acquires data from data pipelines such as a distributed storage system, a structured database, a NoSQL database, a message queue and the like, and the back-end can Cache the data in the LRU Cache module. By broadcasting in the message queue at the time of updating the dimension table, the condition that the dimension data is not updated timely is reduced. The storage support of various big data tools is developed for the data after dimension table association in the system, thereby greatly increasing the compatibility of the system.
Since rule matching conditions and business requirement scenes are constantly changed, the rules often need to be frequently adjusted according to actual changes. And the business personnel performs the operations of adding, deleting and checking the rule factors in the rule base on the front-end feature management interface, and constructs different feature rules by combining different rule factors. And then, a result set of the test can be designated to simply test the constructed rule, and the test result is stored integrally after being suitable. The rule base is not directly changed during each operation, so that decoupling of the business rule and the rule factor is realized.
In the aspect of measuring the overall effect, whether the rule is reasonable or not is determined through analysis of the hit rate of the rule and the alarm frequency of the rule.
It is necessary to determine whether the rule fails, such as a sudden decrease in the interception rate.
It is determined if the rule is superfluous, e.g. a rule never intercepts any event.
Whether the rule has a vulnerability or not is judged, for example, after a certain operation is performed, but the expected effect is not achieved.
In addition, the rule gray scale online mechanism is also constructed based on the overall effect evaluation of the rule, and the white list is set for different areas, or the real data test result is imported, or the optimization rule release flow such as the current limiting online formal environment is adopted.
In the aspect of accumulating analysis optimization data, a combination rule needs to be found, and each step in operation is prevented from being normally available by identifying a combination of certain behaviors, but the behavior result is accumulated to be abnormal; group identification needs to be made, such as by graph analysis techniques, to find the group and label the group to prevent the occurrence of abnormal conditions in the whole area, where each part is normally represented.
In the regular pattern matching of the invention, the Rate algorithm is used for improving the matching efficiency, and the time redundancy caused by repeated calculation is reduced. When the number of rules and fact samples are large, each piece of fact data needs to be matched with Alpha nodes in the Rete network. Most of the rules contain the same conditional atoms, namely, the conditional atoms contained by a plurality of rules exist at the same time, and a certain time waste exists when the conditional atoms are matched with each Alpha node in sequence. Thus, a pre-match module aggregates multiple rules into a small set of rules. And filtering part of normal data in a pre-matching stage through rule set screening, and reducing the matching times of facts and nodes. The implementation logic is to divide rules containing a plurality of same condition atoms into the same rule group, and the condition atom with the largest occurrence number in the rule group is used as the characteristic condition of the rule group. And filtering partial data by screening the rule group in the pre-matching module, and executing rule judgment in the rule group on the residual sample.
The goal of rule learning is to generate a rule set that can cover as many samples as possible, the most straightforward way is sequential coverage, i.e. piece by piece, which is also the simplest training concept of rule learning, after each training in the training set generates a rule, the samples covered by the rule are taken out of the training set, and then another set of rules is trained with the remaining samples. Of course, to avoid over-fitting of the model, general algorithms will incorporate pruning optimization strategies.
To train a highly useful model, excellent features are essential. The important characteristics of the excellent characteristics are that the degree of distinction is higher, so that algorithm developers and business personnel with high sensitivity are required to be matched tightly, configured rules are analyzed, good rule factors are extracted, meanwhile, the algorithm developers are quite familiar with business processes, and the high-availability characteristics are created by combining actual conditions. For actual business, in many cases, the addition of an accurate feature is more effective than the model parameter tuning and even the model optimization, and meanwhile, the accurate feature can also prevent the occurrence of over fitting. In the invention, detailed service parameters can be extracted for combination screening by analyzing and processing the historical data and combing the flow of the industrial monitoring service.
An important requirement of model training is that the number of samples is as large as possible, however, the monitoring service has the defect in nature, after abnormal rule configuration, the number of actually screened cases is very few, which leads to extremely unbalanced number of positive and negative samples, so the invention adopts a resampling method to increase the number of positive samples, specifically, the invention resamples the numerical features contained in the samples by adopting an SMOTE method, resamples the labeled features by adopting a random selection method, and finally combines the two to form a new sample. In the actual training process, parameters such as the maximum iteration times, the step length, the maximum tree depth and the like contained in the algorithm are required to be properly adjusted for a plurality of times, and the parameter with the highest recognition rate is selected.
The event stream processed by the system is continuously increased, and the business rule can follow the data set to carry out certain evolution. And after verifying the validity of the rule through the real-time business data and the archiving history data, the rule engine can have certain evolution capability.
Based on the invention, the invention provides an industrial data monitoring system based on a stream computing engine, which can be added into the industrial Internet of things and comprises the following components:
as shown in fig. 3, the status monitor module: and acquiring and storing real-time indexes by adopting Prometaus supported by a streaming computing engine, regularly grabbing all child node indexes through meta nodes for summarization, and providing a unified data source for Grafana for visualization and alarm configuration. Specifically:
A large amount of runtime information is provided for a front-end interface designed for a streaming computing engine for a user to know the current running condition of a task, but the problem that historical state monitoring cannot be obtained is caused to cause that the user cannot know the historical running state of the task. All child node indexes are captured at fixed time through meta nodes for summarization, so that unified data sources are conveniently provided for Grafana for visualization and alarm configuration is carried out through a dashboard.
In the process of streaming computing, since data continuously flows into the cleaning process, it is difficult to monitor the operation condition of the job. Even if checkpoints are opened, it is not possible to determine whether or how much data was lost. Therefore, the invention designs a heartbeat mechanism, and heartbeat information is injected for each job to monitor the running condition of the job.
With the incoming of each data source, the metadata service generates a set of timed heartbeat signals for consumption by the job. After the heartbeat information flows into each job, the heartbeat information passes through each node along with the data flow, the label of the current node is marked on each node, and then the processing logic flow of the node is skipped to the next node. When the job component passes through the tail end, the job component is collected to the display page through JMX.
The index includes the time of generation of the heartbeat signal, the time of inflow work, and the time of arrival at each node. The time taken by a piece of data to be processed by the whole pipeline and the time taken by each node to process the data can be judged through the index, so that the performance bottleneck of the operation is judged.
Since the heartbeat signal is transmitted at a fixed time, the number of heartbeat information received per job should be uniform. If the number of the finally sent indexes is inconsistent with the expected number, whether data are lost can be further judged.
Throughput and latency are the most important indicators for measuring real-time task performance, and task concurrency and resource allocation often need to be adjusted through the two indicators. If the native latency parameter of the computing engine is started to track task delay tracking, cluster and task performance can be obviously affected, the invention adopts message topic consumption accumulation as an index for measuring task delay, obtains the consumption offset in engine operation through JMX to reversely check the extrusion condition of the persistent message topic in the message queue, and simultaneously comprehensively analyzes and deduces whether the operation has data accumulation or not through the backpressure condition of JMX acquisition operation nodes provided by the engine.
In order to meet the requirement of large-scale data processing, a streaming computing engine generally provides the capability of distributed computing, all tasks submitted to a platform are uniformly scheduled to any computing node by a scheduler, so that the running logs of the tasks are distributed on different machines, and the user positioning logs is difficult. According to the method, the log4j log frame default mechanism is adjusted, the task log is segmented according to days, the outdated log is cleaned regularly, the situation that the computing nodes are not available due to the fact that abnormal tasks are frequently written into disks is avoided, meanwhile, the logs are collected in real time by deploying agents at all the computing nodes, collected into the message queue and then collected into the non-relational database such as the elastic search, and a user can conveniently and quickly find the corresponding log of the operation when the abnormal information needs to be judged through log positioning.
And a resource allocation module: and analyzing the real-time task to provide enough memory for the operation, simultaneously processing the task message in real time, and obtaining a real-time task resource preset value by combining the related index obtained by the real-time task memory analysis and the rationality of the real-time task concurrency, and adjusting the real-time task resource to achieve the real-time task resource configuration. Specifically:
for the real-time task resource analysis thought, mainly include two points:
On the one hand, from the memory occupied by the operation, the real-time task is analyzed from the aspect of the memory of the runtime heap, and enough memory is provided to ensure the stable execution of the operation.
On the other hand, starting from the real-time task message processing capability, the CPU resource is reasonably used as much as possible while meeting the data processing requirement.
And then, combining the related indexes obtained by the real-time task memory analysis and the rationality of the real-time task concurrency degree to obtain a real-time task resource preset value, and adjusting the real-time task resource to finally achieve the purpose of rationalizing the real-time task resource allocation, thereby better reducing the use cost of the machine.
In the aspect of task memory management and control, the system can regularly scan all running streaming computing tasks according to a configuration interval, combines real-time task GC logs, calculates the recommended heap memory size of the streaming computing tasks according to a memory optimization rule, compares the recommended heap memory size with the heap memory of the streaming computing tasks which are actually allocated, can consider that the memory configuration of the streaming computing tasks is wasted if the difference multiple between the streaming computing tasks is too large, and then generates an alarm prompt to schedule the tasks for optimization.
In the aspect of analyzing the real-time job Task message processing capability, whether the input of the data source consumed by the real-time job in unit time is matched with the processing capability of each Operator/Task of the real-time job is judged. And judging whether the data inclination and the flow back pressure of the job occur or not by monitoring the flow of the data source and combining the throughput condition of the operators in the job, reminding the Operator parallelism degree by the optimization rule of the job, and optimizing the job resource allocation.
Through the native K8s support provided by the stream computation engine, the job can actively apply resources to the scheduler so as to achieve the elastic capacity expansion of the job resources and thus cope with sudden data spikes. Based on the construction, the optimization of task resources is fully automated, and the resource allocation of the real-time tasks can be automatically estimated and adjusted by combining the resource use conditions of the real-time tasks in different time periods, so that the aim of improving the utilization rate of the whole real-time cluster resources is fulfilled.
An abnormal state module: and collecting error data in each link of the flow type operation by utilizing SideOutput, summarizing the error data into a unified error flow, and triggering corresponding follow-up actions to carry out real-time warning by finding a mode conforming to the set characteristics in the flow type data. Specifically:
the error record contains error codes preset by the system, original input data, error types and error information. In general, error data is classified and written into a distributed file storage system, and a user can know whether the data is normal or not by monitoring a storage directory.
There are typically three situations when data is restored.
1. In the case that the data format is abnormal, such as the log is truncated to cause incomplete or the timestamp does not conform to the agreed format, the system provides for repairing the data through accurate offline batch operation, skips processing abnormal data events or re-correcting the data and re-entering the operation flow, and backfills the data to the original data pipeline.
2. Work pipe anomalies, such as changes to the actual schema of the data but no updates to the flow table configuration, may result in some field being null, but no anomalies in the overall work. And prompting the user to perform the data supplementing processing operation after prompting through the index output alarm.
3. Abnormal data links, such as switching between the abnormal data source and the master-slave cluster, but the consumption configuration is not changed, so that the conditions of overtime and the like of downstream data application can be caused. Under normal conditions, the retry mechanism and the abnormal recovery mechanism provided by the system can be switched to a healthy data source in time, and if the operation fails, an alarm is given.
When the data is complemented, firstly, the flow table on the line is updated to be configured to be the latest, and the data source is switched to be healthy, so that no more abnormal data are generated, and at the moment, partial partitions in the storage are abnormal. And therefore, independent complement operation is issued to specially repair abnormal data, the output data is written into a temporary directory, and the location of the partition is switched on the Metastore to replace the original abnormal directory. Such a complement flow is therefore transparent to the user of the offline query. And finally, replacing the data of the abnormal partition at proper time and recovering the location.
Query optimization with a streaming engine:
in the scenario of dimension table association, since the dimension table is often changed, especially the dimension is newly added, and the association operation is performed before the dimension is newly added, the association is often not performed. For the scene system, an exception handling strategy aiming at different requirements is developed, if an exception occurs during association, the data is temporarily cached and then tried, the number of times of the attempt can be controlled, and the rule of delaying association can be customized.
And an operator capable of supporting the delay association dimension table is added in SQL support. When the association does not hit, the local cache does not cache the result of the data set being empty, while temporarily storing the data in a state backend, after which it is retried according to the set timer and its number of retries.
Through the topology analysis of the operation structure, the calculation operator and the association calculation operator are connected together when the stream engine operates. Because it has no semantics of the unique key split. When the parallelism of the task is relatively large, all the buffer spaces are accessed by the subtasks associated with each dimension table, so that the buffer is under great pressure.
But observing the SQL implementation of the management operation, the equivalent connection is naturally hashed. When the operator is packaged, the configuration is directly opened, and the user can partition the data by taking the key associated with the dimension table as a hash condition. Thus, the access space between the subtasks of each operator at the downstream can be ensured to be independent, and the cache hit rate at the beginning of the operation can be greatly improved.
Index optimization with a streaming engine:
in order to accelerate data retrieval, a database often creates an index for the data in advance, and then locates the starting position of the data through the index before scanning the data, thereby accelerating data retrieval. Whereas conventional databases are typically row indexes, where the index is created by one or several fields, the index results are stored in a tree structure, such indexes can be accurate to the row level, with the highest index efficiency.
Some large data items also support row indexing, and this has the disadvantage that a large amount of index data causes delays in writing and retrieving. Most of the platform processes are data collection, such as data of a sensor, and the data collection is characterized by very high repeatability, the analysis result is very few, few target behaviors are hidden in mass data, and the proportion is often one thousandth or less. So a more cost effective block indexing scheme is already able to support the current application scenario.
In the existing scheme, index data is stored on a disk in the form of a file, a cache mechanism is additionally arranged to accelerate data access, and the index data is directly stored in a database in a system. There are mainly the following two considerations:
1. In general, a file in a column cannot be updated, and a system performs a merging operation of multiple file contents when periodically optimizing file distribution, so as to ensure consistency of query, and the database is required to provide a Transaction capability.
2. The Performance is that the database has stronger reading-writing and searching capability, even can push down the predicate to the database to finish, the high compression ratio of the database can further save the storage.
Anomaly optimization with a streaming engine:
since streaming jobs require a large number of intermediate processes to be cached, and require a lot of computing resources. The time required for a job to restart from an exception failure may be on the order of one or two minutes, which is unacceptable for some online business scenarios.
The consumption of anomaly detection and initialization is found to be a major bottleneck by analyzing the job anomaly generation process. Anomaly detection is subject to interface polling intervals and resource initialization is subject to container initialization steps. The two systems are respectively optimized, and the deactivation can be quickly found during operation. In addition, resources are reserved, and when downtime occurs, the application of the resources and the initialization time can be omitted.
In the system, a multi-group connectivity detection service is added above the streaming computing engine, and a plurality of working nodes in the detection service cluster periodically detect the connectivity of each machine in the cluster, so that the reliability is ensured because the system is multi-group.
In addition, in terms of reserving resources, the system expands a resource application model of the streaming engine operation, when the streaming operation is submitted, resource redundancy parameters can be set, when the redundancy parameters are activated, the quantity of the redundant resources can be automatically ensured to be higher than the quantity of resource missing caused by single-point faults, and the aggregation of the redundant resources is avoided in resource arrangement.
Meanwhile, the system can control the consumption speed of the data source when the cluster hardware resources are fully loaded. A coordinator is introduced to periodically check the resource consumption on the job collector and the progress of the data source watermark. And according to the global current situation, predicting the maximum position which is allowed to be read by each data source next, and then issuing the maximum position to all the data sources. And the data source determines the reading speed according to the obtained maximum position and the current position. And limiting the consumption speed of the data according to the load. And the consumption speed of all the data sources is dynamically regulated, so that the stability of streaming operation is ensured.
Prometaus is an open source service monitoring system and time series database; meta is an auxiliary tag of html language head area, which is located at the head of the document and does not contain any content; grafana is a dashboard and graphic editor; sideOutput is a side output, any number of additional side output result streams; JMX is a framework for implanting management functions for applications, devices, systems, etc.; an agent is a software or hardware entity that can autonomously move; log4j is an open source item of Apache, and by using Log4j, log information delivery can be controlled to be destined for consoles, files, GUI components, even socket servers, NT event recorders, UNIX Syslog daemons, etc. Transaction generally refers to what is to be done or what is done. In computer terminology, refers to a program execution unit that accesses and possibly updates various data items in a database. Performance is an API for front end Performance monitoring. It can detect the performance in the page, a new API introduced by the W3C performance team, it can detect white screen time, first screen time, user operable time nodes, time of total page download, time of DNS query, time of TCP link, etc.
The workflow flow after combining the data processing system based on the stream computing engine with the data monitoring system based on the stream computing engine is as follows:
1. according to scene needs, a user uploads a custom function developed for business logic in a jar packet mode through an uploading module, and data processing operators and SQL functions needed to be used in a data processing process are expanded and defined
2. The user describes the data flow in the job through SQL sentences, sorts and compiles the front-back relation of job data processing, and then adjusts the parallelism of each processing node and the resource allocation of the whole job according to the requirement. Meanwhile, the method also supports the uploading of jar packets to realize finer control on the operation process by realizing the interface task arrangement process.
3. The decision rule of the rule engine is added according to the service requirement, including threshold value decision, mode decision, combination logic decision and the like, and the user definition is supported for the complex association decision rule, so that after the jar packet of the system specified interface is uploaded, the configuration can be carried out in the system.
4. Configuring a rule learning module of the system according to the operation requirement, starting a system evolution function of the platform, continuously training a rule model in the process of processing data by the system, and optimizing rules of the evolution platform by learning interactive logic of a user and optimizing flow peak-valley characteristics, rule hit rate, resource occupation condition and the like of operation data including data. The refining rule which is more in line with the business operation characteristics is evolved to be on line after manual confirmation.
5. And configuring a data source and a data sinking channel of the operation, and supporting reading and writing data from the data channels of a distributed storage system, a structured database, a NoSQL database, a message queue and the like. Meanwhile, the construction and analysis of various data formats such as json, protobuffer, avro are supported.
6. Submitting the grammar specification and the custom package writing specification to an operating system layout, and firstly generating a processing module through logic disassembly and simultaneously checking the grammar specification and the custom package writing specification in the operation flow. The directed acyclic graph constructed by the sub-modules is displayed on the page, the data flow direction of the job and the arrangement sequence of operators can be clearly displayed, and the data flow direction and the arrangement sequence of operators are submitted to the task scheduling module to request resources and pull up the job after checking.
7. After the job request is applied to the job resource from the resource cluster, the data of the corresponding label is continuously acquired from the data source, the data is processed according to the configured rule, and the rule is judged by the rule engine and then is output to the real-time display module or the data sink is carried out. In the operation process, a user can know the operation details through the task monitoring module, such as the operation throughput, the network IO, the resource occupation, the module backpressure and other information, so that the user can know the operation state of the operation.
8. According to the configuration of the user, confirmation points are generated in a time-division or operation batch in the operation process, and all state information of the current operation is contained. Meanwhile, a service archiving point is generated in the interval period of the service flow, and the service archiving point contains the calculation result of the current operation. By combining the two mechanisms, the operation indexes can be accumulated, and the operation state can be analyzed. And simultaneously, when the operation unit is abnormal, the abnormal recovery capability is provided.
9. When an abnormal situation occurs in the workflow, the abnormal recovery is performed according to the user configuration, including the abnormal processing configuration such as attempting to restart the job, attempting to recover from the confirmation point, recording the abnormal data, or fast failure and push. The task state in the operation monitoring module can be updated in real time, so that operation and maintenance personnel can be helped to acquire task abnormal information in time
10. When the operation is terminated, the resource monitoring module can recycle the resources after acquiring the state information of the current task, release the operation resources, clear the file cache, reserve or clear the archiving point and the confirmation point according to the user configuration. And storing the state data in the task process into a time sequence database, and ending the streaming computing operation.
Those skilled in the art will appreciate that the invention provides a system and its individual devices, modules, units, etc. that can be implemented entirely by logic programming of method steps, in addition to being implemented as pure computer readable program code, in the form of logic gates, switches, application specific integrated circuits, programmable logic controllers, embedded microcontrollers, etc. Therefore, the system and various devices, modules and units thereof provided by the invention can be regarded as a hardware component, and the devices, modules and units for realizing various functions included in the system can also be regarded as structures in the hardware component; means, modules, and units for implementing the various functions may also be considered as either software modules for implementing the methods or structures within hardware components.
The foregoing describes specific embodiments of the present invention. It is to be understood that the invention is not limited to the particular embodiments described above, and that various changes or modifications may be made by those skilled in the art within the scope of the appended claims without affecting the spirit of the invention. The embodiments of the present application and features in the embodiments may be combined with each other arbitrarily without conflict.
Claims (10)
1. A data processing system based on a streaming computing engine, comprising:
and a data acquisition module: after acquiring original data from a terminal, synchronously compiling coding and decoding protocols of a transmitting end and a receiving end, matching corresponding table information from an unbounded data stream to finish the acquisition of information events, and cleaning and screening service field information;
front-end filter module: filtering the light weight data provided by the collected and screened information through a bloom filter, and synchronously filtering the heavy weight data provided by Key-Value storage;
and a data preprocessing module: the streaming computing engine provides a unified message processing path of the capability of the streaming system, avoids resource waste generated by offline and real-time two sets of service development, and abstracts the data flow by providing SQL support;
Data blood margin module: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a computing platform for analyzing and starting the operation.
2. The data processing system based on the stream computing engine according to claim 1, wherein when the data acquisition module acquires the information event, the log file is in a mixed format when written, records in a row format and a state format are required to be respectively analyzed according to the record storage format, the log file is updated into the log file after the stream computing engine submits the transaction to refresh to the disk, and the log consumption point is required to be recorded to cope with the abnormal restarting condition of the operation.
3. The data processing system based on the stream computing engine according to claim 1, wherein after the data acquisition module acquires the unbounded data stream, the unbounded data stream is sent to the consumption queue, peak clipping and valley filling of the data are completed by high throughput processing provided by the message queue, and multi-partition setting is adopted for message subjects of the message queue.
4. The data processing system based on the stream computing engine according to claim 1, wherein the data acquisition module provides storage support including HDFS and Kafka during the management and acquisition phase of data while supporting data stored in the multi-parallelism consumption message queue, ensuring that the consumption speed of the data link does not form a short board at the entrance.
5. The data processing system based on a streaming computing engine of claim 1, wherein the pre-filter module de-duplicates data using a hash function built in a bloom filter;
-merging the message records holding the same Key using Key-Value store;
the hash algorithm is used to convert the key into an integer for storage, which takes up a maximum of 8 bytes.
6. The data processing system based on the stream computing engine according to claim 1, wherein the pre-filtering module filters illegal data by a custom service threshold or a filter operator for compiling custom filtering rules for the data stream after the de-duplication processing.
7. The data processing system based on the stream computing engine according to claim 1, wherein the data blood margin module defines the database schema definition language designation data meta information, re-writes the data manipulation language designation data processing process, defines the real-time data stream as a real-time table to be stored in the meta storage component, and the user directly selects the required data source therefrom, and then can complete the job development only by writing the statement of the data manipulation language.
8. The data processing system based on the Streaming computing engine according to claim 7, wherein the data blood margin module supplements the converted database schema definition language statement according to the upstream and downstream databases to form a complete Streaming SQL script for submission when the task is submitted, and communicates development flows of batch processing services and Streaming processing services.
9. A data processing method based on a stream computing engine, comprising the steps of:
and a data acquisition step: after acquiring original data from a terminal, synchronously compiling coding and decoding protocols of a transmitting end and a receiving end, matching corresponding table information from an unbounded data stream to finish the acquisition of information events, and cleaning and screening service field information;
the pre-filtration step: filtering the light weight data provided by the collected and screened information through a bloom filter, and synchronously filtering the heavy weight data provided by Key-Value storage;
a data preprocessing step: the stream computing engine provides stream batch unified stream processing, avoids resource waste generated by offline and real-time two sets of service development, and abstracts data stream by providing SQL support;
Data blood-edge step: and writing the flow direction of script description data by adopting SQL (structured query language) through SQL (structured query language) abstract processing provided by a streaming computing engine, and submitting the flow direction to a computing platform for analyzing and starting the operation.
10. A computer readable storage medium storing a computer program, which when executed by a processor implements the steps of the method of claim 9.
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202111226614.8A CN116010452A (en) | 2021-10-21 | 2021-10-21 | Industrial data processing system and method based on stream type calculation engine and medium |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202111226614.8A CN116010452A (en) | 2021-10-21 | 2021-10-21 | Industrial data processing system and method based on stream type calculation engine and medium |
Publications (1)
| Publication Number | Publication Date |
|---|---|
| CN116010452A true CN116010452A (en) | 2023-04-25 |
Family
ID=86028511
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202111226614.8A Pending CN116010452A (en) | 2021-10-21 | 2021-10-21 | Industrial data processing system and method based on stream type calculation engine and medium |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN116010452A (en) |
Cited By (8)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN116414390A (en) * | 2023-03-29 | 2023-07-11 | 南京审计大学 | Dynamic operation case development system for big data audit |
| CN117033322A (en) * | 2023-07-08 | 2023-11-10 | 武汉光庭信息技术股份有限公司 | Big data storage method, system, electronic equipment and storage medium integrating stream and batch |
| CN117312391A (en) * | 2023-10-23 | 2023-12-29 | 中南民族大学 | Big data platform dynamic index evaluation method and system based on stream computing |
| CN117971901A (en) * | 2024-01-30 | 2024-05-03 | 华南理工大学 | Multi-channel unbounded stream connection method for stream processing system based on Streaming SQL |
| CN118113437A (en) * | 2024-03-11 | 2024-05-31 | 深圳市今天国际物流技术股份有限公司 | An offline development platform system based on big data |
| CN118194207A (en) * | 2024-05-15 | 2024-06-14 | 中节能晶和科技有限公司 | Rule engine-based road illumination Internet of things equipment anomaly monitoring system and method |
| CN119473441A (en) * | 2024-09-23 | 2025-02-18 | 太极计算机股份有限公司 | A component-based streaming data processing method |
| CN117971901B (en) * | 2024-01-30 | 2025-10-14 | 华南理工大学 | Multi-channel unbounded stream connection method for stream processing system based on Streaming SQL |
-
2021
- 2021-10-21 CN CN202111226614.8A patent/CN116010452A/en active Pending
Cited By (10)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN116414390A (en) * | 2023-03-29 | 2023-07-11 | 南京审计大学 | Dynamic operation case development system for big data audit |
| CN116414390B (en) * | 2023-03-29 | 2024-04-05 | 南京审计大学 | Dynamic operation case development system for big data audit |
| CN117033322A (en) * | 2023-07-08 | 2023-11-10 | 武汉光庭信息技术股份有限公司 | Big data storage method, system, electronic equipment and storage medium integrating stream and batch |
| CN117312391A (en) * | 2023-10-23 | 2023-12-29 | 中南民族大学 | Big data platform dynamic index evaluation method and system based on stream computing |
| CN117971901A (en) * | 2024-01-30 | 2024-05-03 | 华南理工大学 | Multi-channel unbounded stream connection method for stream processing system based on Streaming SQL |
| CN117971901B (en) * | 2024-01-30 | 2025-10-14 | 华南理工大学 | Multi-channel unbounded stream connection method for stream processing system based on Streaming SQL |
| CN118113437A (en) * | 2024-03-11 | 2024-05-31 | 深圳市今天国际物流技术股份有限公司 | An offline development platform system based on big data |
| CN118194207A (en) * | 2024-05-15 | 2024-06-14 | 中节能晶和科技有限公司 | Rule engine-based road illumination Internet of things equipment anomaly monitoring system and method |
| CN119473441A (en) * | 2024-09-23 | 2025-02-18 | 太极计算机股份有限公司 | A component-based streaming data processing method |
| CN119473441B (en) * | 2024-09-23 | 2025-05-27 | 太极计算机股份有限公司 | A component-based streaming data processing method |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| CN116009428B (en) | Industrial data monitoring system, method and medium based on streaming computing engine | |
| US11468062B2 (en) | Order-independent multi-record hash generation and data filtering | |
| US10554771B2 (en) | Parallelized replay of captured database workload | |
| US20220179860A1 (en) | Database workload capture and replay | |
| CN116010452A (en) | Industrial data processing system and method based on stream type calculation engine and medium | |
| CN102323945B (en) | SQL (Structured Query Language)-based database management method and device | |
| CN104423960B (en) | A kind of method and system of project continuous integrating | |
| US20130145350A1 (en) | Efficient, large scale trace storage system | |
| WO2007036932A2 (en) | Data table management system and methods useful therefor | |
| CN110750582B (en) | Data processing method, device and system | |
| CN113220530B (en) | Data quality monitoring method and platform | |
| CN118394829A (en) | Data blood edge analysis method, device, equipment and readable storage medium | |
| CN117149873A (en) | Data lake service platform construction method based on flow batch integration | |
| CN118377768A (en) | Data ETL method, device, equipment and medium based on service flow | |
| CN118227181A (en) | Method and system for counting user software thermal update in real time | |
| CN113553320B (en) | Data quality monitoring method and device | |
| CN114942916B (en) | Real-time data warehouse design method, device, equipment and storage medium based on Doris | |
| Fjällid | A comparative study of databases for storing sensor data | |
| Aytas | Designing Big Data Platforms: How to Use, Deploy, and Maintain Big Data Systems | |
| Rishaug et al. | Event sourcing | |
| Djiken et al. | Indexing Large Amount of Log Data for Predictive Maintenance | |
| Höger | Fault tolerance in parallel data processing systems | |
| Santos | Data ingestion in Smart Cities | |
| Guntupalli | From SQL to Spark: My Journey into Big Data and Scalable Systems How I Debug Complex Issues in Large Codebases | |
| Chen et al. | Towards Low-Latency Big Data Infrastructure |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination |