[go: up one dir, main page]

CN115328664B - Message consumption method, device, equipment and medium - Google Patents

Message consumption method, device, equipment and medium Download PDF

Info

Publication number
CN115328664B
CN115328664B CN202211237370.8A CN202211237370A CN115328664B CN 115328664 B CN115328664 B CN 115328664B CN 202211237370 A CN202211237370 A CN 202211237370A CN 115328664 B CN115328664 B CN 115328664B
Authority
CN
China
Prior art keywords
message
target
component
partition
preset
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN202211237370.8A
Other languages
Chinese (zh)
Other versions
CN115328664A (en
Inventor
顾悦
薛飞
黄岗
周圣强
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
OP Retail Suzhou Technology Co Ltd
Original Assignee
OP Retail Suzhou Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by OP Retail Suzhou Technology Co Ltd filed Critical OP Retail Suzhou Technology Co Ltd
Priority to CN202211237370.8A priority Critical patent/CN115328664B/en
Publication of CN115328664A publication Critical patent/CN115328664A/en
Application granted granted Critical
Publication of CN115328664B publication Critical patent/CN115328664B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Images

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • G06F9/5005Allocation of resources, e.g. of the central processing unit [CPU] to service a request
    • G06F9/5027Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/54Interprogram communication
    • G06F9/546Message passing systems or structures, e.g. queues
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/54Interprogram communication
    • G06F9/547Remote procedure calls [RPC]; Web services
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F2209/00Indexing scheme relating to G06F9/00
    • G06F2209/54Indexing scheme relating to G06F9/54
    • G06F2209/547Messaging middleware
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F2209/00Indexing scheme relating to G06F9/00
    • G06F2209/54Indexing scheme relating to G06F9/54
    • G06F2209/548Queue

Landscapes

  • Engineering & Computer Science (AREA)
  • Software Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • General Factory Administration (AREA)

Abstract

The application discloses a message consumption method, a device, equipment and a medium, which relate to the technical field of intelligent platforms, and the method comprises the following steps: pulling a first target message from a preset message publishing system by using a preset consumer component, and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs; controlling a target partition worker component to process a first target message by using a preset service executor component, marking the running state of the target partition worker component as a finished state after the first target message is processed, and recording the displacement of the target partition worker component; when the preset consumer component monitors that the running state of the target partition worker component is the completion state, the displacement of the target partition worker component is obtained, and the displacement is manually submitted to a preset message issuing system. The invention defines the consumer assembly by user and realizes the decoupling of message processing and message processing management.

Description

Message consumption method, device, equipment and medium
Technical Field
The invention relates to the technical field of intelligent platforms, in particular to a message consumption method, a message consumption device, message consumption equipment and a message consumption medium.
Background
In daily development, a single thread is often used for processing messages of kafka (a high-throughput distributed publish-subscribe message system) in development, the advantage of processing messages in the single thread is that implementation is simple, submission of displacement is managed by kafka (active submission of displacement), but the disadvantage is obvious, and because of the single thread, the message processing capability is limited by concurrency, and the method is often not suitable for a large-batch message processing scene. If multiple threads are used to process messages on the premise of single-thread message pulling, the use of active submission of displacement can lead to inaccuracy in message displacement submission. On this basis, therefore, it is proposed to use a single thread to pull messages, to process messages in parallel in multiple threads, and to handle the submission of displacements manually. Similarly, in order to simplify the logic of message processing, the consumer usually uses the service logic to process the message directly after pulling the message, so that the acquisition logic of the message and the processing logic of the service are mixed together and coupled with each other, and in case of change of the message processing logic, the original logic of message acquisition is changed carelessly, which may result in bug generation.
Therefore, in the message consumption process, how to avoid the inaccurate situation of message displacement submission when the displacement is submitted actively due to single-thread message processing and strong coupling of the acquisition logic of the message and the processing logic of the service is a problem to be solved in the field.
Disclosure of Invention
In view of this, an object of the present invention is to provide a message consumption method, apparatus, device, and medium, which can customize consumer components, achieve high throughput, manually submit displacement, have low coupling with services, have high message consumption accuracy, decouple message processing logic from message processing management, and asynchronously process messages in multiple threads. The specific scheme is as follows:
in a first aspect, the present application discloses a message consumption method, including:
pulling a first target message from a preset message publishing system by using a preset consumer component, and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs;
controlling the target partition worker component to process the first target message by utilizing a preset service executor component;
after the first target message is processed, marking the running state of the target partition worker component as a finished state, and recording the displacement of the target partition worker component;
when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system.
Optionally, the sending the first target message to the target partition worker component uniquely corresponding to the target partition to which the first target message belongs includes:
determining a message identifier of the first target message, and determining a target partition based on a target subject to which the first target message belongs and a partition identifier in the message identifier;
and sending the first target message to a target partition worker component uniquely corresponding to the target partition.
Optionally, the sending the first target message to the target partition worker component uniquely corresponding to the target partition to which the first target message belongs includes:
sending the first target message to a message queue of a target partition worker component uniquely corresponding to a target partition to which the first target message belongs;
correspondingly, the controlling the target partition worker component to process the first target message by using a preset service executor component, and after the first target message is processed, marking the running state of the target partition worker component as a finished state, and recording the displacement of the target partition worker component includes:
and controlling the target partition worker component to process the first target message in the message queue by using a preset service executor component, marking the running state of the target partition worker component as a finished state after all messages in the message queue are processed, and recording the displacement of the target partition worker component.
Optionally, the controlling the target partition worker component to process the first target message in the message queue by using a preset service executor component, and after all messages in the message queue are processed, marking the running state of the target partition worker component as a completed state, and recording a displacement amount of the target partition worker component includes:
controlling the target partition worker component to call a first method predefined in a preset service executor component to process the messages in the message queue corresponding to the target partition worker component, and monitoring whether unprocessed messages exist in the message queue corresponding to the target partition worker component;
if unprocessed messages do not exist in the message queue corresponding to the target partition worker component, marking the running state of the target partition worker component as a completion state;
and calling a predefined second method in the preset service executor component to call a target function interface, and recording the displacement of the target partition worker component.
Optionally, the message consumption method further includes:
when the preset consumer component monitors that the running state of the target partition worker component is an unfinished state, acquiring and recording the displacement of the target partition worker component, and prohibiting the step of manually submitting the displacement to the preset message issuing system.
Optionally, after the pulling, by using a preset consumer component, a first target message from a preset message publishing system and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs, the method further includes:
calling a preset application programming interface in the preset message publishing system to control the preset consumer component to skip the target partition when pulling the message next time so as to pull the message from the next partition of the target partition;
correspondingly, after the first target message processing is finished, the method further includes:
and calling a preset application programming interface in the preset message publishing system to control the preset consumer component to pull a second target message from the target partition, and sending the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs.
Optionally, the controlling the preset consumer component to pull the second target message from the target partition and send the second target message to the target partition worker component uniquely corresponding to the target partition to which the second target message belongs includes:
and determining the next position of the displacement of the target partition worker component in the target partition as a target message pulling position, controlling the preset consumer component to pull a second target message from the target message pulling position, and then sending the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs.
In a second aspect, the present application discloses a message consumption apparatus comprising:
the message pulling module is used for pulling a first target message from a preset message publishing system by using a preset consumer component and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs;
the partition worker component working module is used for controlling the target partition worker component to process the first target message by using a preset service executor component, marking the running state of the target partition worker component as a completion state after the first target message is processed, and recording the displacement of the target partition worker component;
and the displacement submitting module is used for acquiring the displacement of the target partition worker component and manually submitting the displacement to the preset message issuing system when the preset consumer component monitors that the running state of the target partition worker component is a finished state.
In a third aspect, the present application discloses an electronic device, comprising:
a memory for storing a computer program;
a processor for executing the computer program to implement the aforementioned message consumption method.
In a fourth aspect, the present application discloses a computer storage medium for storing a computer program; wherein the computer program realizes the steps of the message consumption method disclosed in the foregoing when being executed by a processor.
The method comprises the steps that a preset consumer component is used for pulling a first target message from a preset message publishing system, and the first target message is sent to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs; controlling the target partition worker component to process the first target message by utilizing a preset service executor component; after the first target message is processed, marking the running state of the target partition worker component as a finished state, and recording the displacement of the target partition worker component; when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system. Therefore, the consumer component is divided into the consumer component, the partition worker component and the service executor component, so that decoupling of message processing logic and message processing management is realized, messages are processed asynchronously in a multithreading mode, high message throughput and manual displacement submission are realized, coupling with services is low, and message consumption accuracy is high.
Drawings
In order to more clearly illustrate the embodiments of the present invention or the technical solutions in the prior art, the drawings used in the description of the embodiments or the prior art will be briefly described below, it is obvious that the drawings in the following description are only embodiments of the present invention, and for those skilled in the art, other drawings can be obtained according to the provided drawings without creative efforts.
FIG. 1 is a flow chart of a message consumption method provided by the present application;
FIG. 2 is a schematic diagram of a consumer class component according to the present application;
FIG. 3 is a flow chart of a specific message consumption method provided by the present application;
fig. 4 is a schematic structural diagram of a message processing provided in the present application;
FIG. 5 is a schematic view of a consumer component workflow provided by the present application
FIG. 6 is a schematic diagram of a message consumption device according to the present application;
fig. 7 is a block diagram of an electronic device provided in the present application.
Detailed Description
The technical solutions in the embodiments of the present invention will be clearly and completely described below with reference to the drawings in the embodiments of the present invention, and it is obvious that the described embodiments are only a part of the embodiments of the present invention, and not all of the embodiments. All other embodiments, which can be derived by a person skilled in the art from the embodiments given herein without making any creative effort, shall fall within the protection scope of the present invention.
In the prior art, single-thread message processing is adopted, and the coupling between the message acquisition logic and the service processing logic is strong, so that the message displacement submission is inaccurate when the displacement is submitted actively and a bug is easy to generate in operation. The invention can self-define consumer components, realize high throughput, manually submit displacement, low coupling with service, high message consumption accuracy, decoupling of message processing logic and message processing management, and multithreading asynchronous message processing.
The embodiment of the invention discloses a message consumption method, which is described with reference to fig. 1 and comprises the following steps:
step S11: the method comprises the steps of pulling a first target message from a preset message publishing system by using a preset consumer component, and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs.
In this embodiment, the pulling the first target message from the preset message publishing system by using the preset consumer component may include: pulling a first target message from the kafka by using a preset consumer component; correspondingly, the manually submitting the displacement amount to the preset message publishing system may include: the displacement amount is manually submitted to kafka. That is, the preset message issuing system in this embodiment may be kafka.
In this embodiment, the consumer class component of message consumption is divided into three components to be executed, namely, a consumer component, which may be referred to as a TurboKafka consumer, a partition worker component, which may be referred to as a partition worker thread, and a service executor component, which may be referred to as a service execution unit. One consumer class component comprises a consumer, a plurality of partitionworkers and a corresponding service executor component BusinessExecutor.
Specifically, the consumer component turbokafka consumer mainly has the responsibility of acquiring a message, handing the message to a dedicated partition worker thread according to a partition to which the message belongs, and determining whether to submit a displacement of the partition according to a processing result of the message processing thread. Meanwhile, the TurboKafka Consumer maintains a registry, namely a registry formed by a partition id and a partitionWorker corresponding to the partition; the PartitionWorker is a working thread of partition information, each partition corresponds to the respective PartitionWorker, and the number of the partitionworkers depends on the number of the partitions and is specifically used for processing the partition information; business executor is an abstraction of business logic and is used for connecting an interface of a specific business function, and when a business processes a message, the business only needs to realize the interface and realize the specific message processing logic. It should be noted that business executors are the key to implementing the decoupling of message processing logic and message processing management, and the binding of consumers and business logic can be implemented through the abstraction of this interface. Fig. 2 is a schematic diagram of a working structure of a consumer class component according to the present application, where kafka is connected to a custom consumer class component in the present invention, that is, a consumer component turbokafka provider, a partition worker component PartitionWorker, and a service executor component BusinessExecutor, and the service executor component BusinessExecutor is connected to an external function interface, and interacts with the external function interface to implement calling.
In a specific implementation process, the major role of the turbo kafka Consumer is to pull a message from kafka, and the message acquisition is divided into three steps: (1) distributing messages: each message has the mark of the partition to which the message belongs, so the message can be delivered to the corresponding PartitionWorker according to the partition of the message after the message is obtained, and then one api of kafka is called, so that the current processed partition is passed by the consumer when the consumer pulls the message next time; (2) checking the status of PartitionWorker: checking the state of the PartitionWorker in work, if the current state of the PartitionWorker is a finished state, acquiring the latest displacement of the PartitionWorker, caching the latest displacement, and recovering the acquisition of the message of the current partition; (3) And manually submitting the displacement information of the partitions according to the displacement information of the partitions cached in the last step.
In this embodiment, after the pulling, by using a preset consumer component, a first target message from a preset message publishing system and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs, the method further includes: calling a preset application programming interface in the preset message publishing system to control the preset consumer component to skip the target partition when pulling the message next time so as to pull the message from the next partition of the target partition; correspondingly, after the first target message processing is finished, the method further includes: and calling a preset application programming interface in the preset message publishing system to control the preset consumer component to pull a second target message from the target partition, and sending the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs. That is, after the preset consumer component acquires the message, the message can be delivered to the corresponding PartitionWorker according to the partition of the message, after that, the preset API of kafka is called, so that the consumer passes through the currently processed partition when pulling the message next time, and after all the messages corresponding to the target partition worker component are processed, the API allows the consumer to pull the message from the target partition next time.
In a specific implementation manner in this embodiment, after pulling the first target message and sending the first target message to the target partition worker, the consumer may call the preset API of kafka to set the current message pull state to the pull-prohibited state, and when the running state of the target partition worker component is the completion state, call the preset API of kafka to set the message pull state of the target partition to the pull-permitted state, at this time, pull the second target message from the target partition, and start a new round of message processing.
What is accomplished in this step is the above-mentioned process of utilizing the TurboKafka Consumer class to pull messages from the preset message distribution system and distribute the bulk messages.
Step S12: and controlling the target partition worker component to process the first target message by using a preset service executor component, marking the running state of the target partition worker component as a finished state after the first target message is processed, and recording the displacement of the target partition worker component.
In a specific implementation process of this embodiment, the partition worker component PartitionWorker may process a current message by using a preset service executor component BusinessExecutor, mark a running state of the partition worker component PartitionWorker, and determine and record a displacement amount of the target partition worker component.
Step S13: when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system.
In this embodiment, the turbokafka conditioner monitors the state of PartitionWorker, and when the state of PartitionWorker is monitored to be a complete state, the latest displacement of the PartitionWorker is obtained and cached, and the obtaining of the message of the current partition is recovered by using a preset API in a preset message publishing system.
In this embodiment, the message consumption method may further include: when the preset consumer component monitors that the running state of the target partition worker component is an unfinished state, acquiring and recording the displacement of the target partition worker component, and prohibiting the step of manually submitting the displacement to the preset message issuing system.
In this embodiment, if the status of the turbokafka provider monitoring PartitionWorker is an incomplete status, the displacement of the target partition worker component is also obtained and recorded, but the step of manually submitting the displacement to the preset message publishing system is not performed, that is, only the displacement is recorded, and the displacement is not manually submitted.
In this embodiment, the controlling the preset consumer component to pull the second target message from the target partition and send the second target message to the target partition worker component uniquely corresponding to the target partition to which the second target message belongs includes: and determining the next position of the displacement of the target partition worker component in the target partition as a target message pulling position, controlling the preset consumer component to pull a second target message from the target message pulling position, and then sending the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs. That is, each time the preset consumer component pulls a message, the preset consumer component pulls the message from the next position of the displacement amount recorded by the partition work corresponding to the current partition. After pulling the message, the message is also sent to the corresponding partition worker component, and then the running state of the corresponding partition worker component is marked as an unfinished state.
In this embodiment, a preset consumer component is used to pull a first target message from a preset message publishing system, and send the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs; controlling the target partition worker component to process the first target message by using a preset service executor component, marking the running state of the target partition worker component as a finished state after the first target message is processed, and recording the displacement of the target partition worker component; when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system. Therefore, the invention realizes decoupling of message processing logic and message processing management by dividing the consumer assembly into the consumer assembly, the partition worker assembly and the service executor assembly, and multithreading asynchronous message processing, thereby realizing high message throughput and manual displacement submission.
Fig. 3 is a flowchart of a specific message consumption method according to an embodiment of the present disclosure. Referring to fig. 3, the method includes:
step S21: the method comprises the steps of pulling a first target message from a preset message publishing system by using a preset consumer component, determining a message identifier of the first target message, then determining a target partition based on a target subject to which the first target message belongs and a partition identifier in the message identifier, and sending the first target message to a message queue of a target partition worker component uniquely corresponding to the target partition to which the first target message belongs.
In this embodiment, each message includes a corresponding message identifier, the message representation includes a subject (i.e., topic) to which the message belongs, and a partition to which the message belongs, in this step, a target partition may be determined based on the subject to which the message belongs and a partition ID of the partition, a target work area worker component corresponding to the target partition is determined based on a local registry, and then the first target message is sent to the target work area worker component for processing.
In this embodiment, the target partition worker component partionworker processes the first target message. Specifically, the messages in each partition thread are stored in a queue form, and the messages in the queue can be interacted with an external function interface through a method defined in the BusinessExecutor to realize calling. Fig. 4 is a schematic structural diagram of message processing proposed in the present application, that is, a consumer distributes a message to partition worker threads, and the partition worker threads store the received message in respective corresponding message queues for processing.
In this embodiment, the controlling the target partition worker component to process the first target message by using a preset service executor component, and after the first target message is processed, marking an operating state of the target partition worker component as a finished state includes: and controlling the target partition worker component to process the first target message in the message queue by using a preset service executor component, and marking the running state of the target partition worker component as a finished state after all messages in the message queue are processed. That is, when the target partition worker component processes a message, the message in the current corresponding message queue is processed.
Step S22: and controlling the target partition worker component to call a first method predefined in a preset service executor component to process the messages in the message queue corresponding to the target partition worker component, and monitoring whether unprocessed messages exist in the message queue corresponding to the target partition worker component.
In this embodiment, the controlling the target partition worker component to invoke a first method predefined in a preset service executor component to process the first target message may include: controlling the target partition worker component to call a first method defined in advance in a preset service executor component to determine a message to be processed from a message queue corresponding to the target partition worker component, and processing the message to be processed; and after the to-be-processed message is processed, removing the current to-be-processed message from the message queue, re-determining the to-be-processed message, and then executing the step of processing the to-be-processed message.
In a specific implementation process, the business execution defines two main methods, namely a first method execute (ConsumerRecord) for processing specific single messages and a second method aft execution () for an interface method executed after processing the messages, and a specific service logic only needs to be defined in the aft execution when the service is used.
In a specific implementation process, partitionWorker is a class for actually processing messages, two methods of BusinessExecutor are mainly called inside, because a consumer pulls many messages once, a partition worker thread can batch process the messages, the execution method of BusinessExecutor can call each message once in a circulation of the partitionWorker batch processing messages, whether unprocessed messages exist in a message queue can be monitored, after all messages are processed, the afterExecutor method of BusinessExecutor can be called, processing logic after all messages are processed can be defined, after all messages are processed, the state of partionWorker can be marked as completed, and the latest displacement of the current message processing is recorded.
Step S23: and if the message queue corresponding to the target partition worker component does not have unprocessed messages, marking the running state of the target partition worker component as a completion state.
In this step, when unprocessed information does not exist in the message queue corresponding to the target partition worker component, indicating that all messages in the message queue are processed, the operating state of the target partition worker component is marked as a completed state.
Step S24: and calling a predefined second method in the preset service executor component to call a target function interface, and recording the displacement of the target partition worker component.
In this embodiment, after the target partition worker component marks the running state of the target partition worker component as a complete state, a second method, asterexecute, is called, a processing logic after all messages are processed is defined, and the second method is mainly used for calling a target function interface.
In the embodiment of the present invention, the function interface may be an interface method for implementing a time recording function, and at this time, the time data stored in kafka is consumed by the present technical solution.
After the target function interface is called, the target partition worker component records the displacement of the target partition worker component.
Step S25: when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system.
In a specific implementation manner in this embodiment, if one kafka is provided with 4 partitions, 4 partition worker components are provided in the corresponding custom consumer class component, and if 800 messages are pulled by the consumer component at one time and stored in the corresponding target partition thread according to the message structure of each message to implement multi-thread concurrent processing, 100 messages are processed at one time by partition 0, 200 messages are processed at one time by partition 1, 200 messages are processed at one time by partition 2, and 300 messages are processed at one time by partition 3. After each partition stores the message until the consumer component does not pull the message for that partition from kafka before processing is complete. Each message of each partition implements a function call with an external interface through an execute (ConsumerRecord) method of the service processing unit, and implements an interface method executed after processing the messages through an aferExecute () method of the service processing unit. After all the messages are processed, the running state of the ParitionWorker of the corresponding partition is marked to be a finished state, the message pulling state of the corresponding partition is set to be a pulling-allowed state so as to recover the acquisition of the messages of the current partition, and the latest displacement of the current message processing is recorded.
Moreover, the consumer component will regularly check the state of each ParitionWorker, if the running state of the partition 0 is marked as complete, record 100 displacement, and recover the message of pulling the corresponding partition 0 from kafka; when the partition 1 is not marked completely, recording the displacement of 130 pieces … …, and the consumer component submits displacement at this time, only reporting the partition 0 which is processed completely, and when pulling the message next time, pulling the message of kafka starts to pull the data from the 101 th piece of the partition 0, and performing a new round of message processing. Messages stored in the threads distributed to the ParitionWorker are decoupled from the service through different functional interfaces or flowing to different service requests.
Fig. 5 is a schematic diagram of a work flow of a consumer component, which is proposed in the present application, and in the diagram, a consumer component pulls a message of each partition to kafka, and forwards the message to a thread corresponding to each partition for processing, and each service executor concurrently and continuously executes consumption from each blocking queue corresponding to each thread. In the preset partition offset management, offset information of each partition is stored, and an offset is submitted to kafka.
In the embodiment, the data pulled in the kafka is processed by different partition workers through the custom consumer component, and is flowed to different service requests through different functional interfaces, so that decoupling of messages and services is realized, high throughput under multithreading asynchronous message processing is realized, and message consumption accuracy is higher in the method.
Referring to fig. 6, an embodiment of the present application discloses a message consumption apparatus, which may specifically include:
the message pulling module 11 is configured to pull a first target message from a preset message publishing system by using a preset consumer component, and send the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs;
a partition worker component working module 12, configured to control the target partition worker component to process the first target message by using a preset service executor component, mark an operating state of the target partition worker component as a completed state after the first target message is processed, and record a displacement of the target partition worker component;
a displacement submitting module 13, configured to, when the preset consumer component monitors that the running state of the target partition worker component is a complete state, obtain a displacement of the target partition worker component, and manually submit the displacement to the preset message publishing system.
The method comprises the steps that a preset consumer component is used for pulling a first target message from a preset message publishing system, and the first target message is sent to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs; controlling the target partition worker component to process the first target message by using a preset service executor component, marking the running state of the target partition worker component as a finished state after the first target message is processed, and recording the displacement of the target partition worker component; when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system. Therefore, the invention realizes decoupling of message processing logic and message processing management by dividing the consumer assembly into the consumer assembly, the partition worker assembly and the service executor assembly, and multithreading asynchronous message processing, thereby realizing high message throughput and manual displacement submission.
Further, an electronic device is also disclosed in the embodiments of the present application, fig. 7 is a block diagram of the electronic device 20 shown in the exemplary embodiments, and the content in the diagram cannot be considered as any limitation to the scope of the application.
Fig. 7 is a schematic structural diagram of an electronic device 20 according to an embodiment of the present disclosure. The electronic device 20 may specifically include: at least one processor 21, at least one memory 22, a power supply 23, a display 24, an input-output interface 25, a communication interface 26, and a communication bus 27. Wherein the memory 22 is used for storing a computer program, which is loaded and executed by the processor 21 to implement the relevant steps in the message consumption method disclosed in any of the foregoing embodiments. In addition, the electronic device 20 in the present embodiment may be specifically an electronic computer.
In this embodiment, the power supply 23 is configured to provide a working voltage for each hardware device on the electronic device 20; the communication interface 26 can create a data transmission channel between the electronic device 20 and an external device, and the communication protocol followed by the communication interface is any communication protocol that can be applied to the technical solution of the present application, and is not specifically limited herein; the input/output interface 25 is configured to obtain external input data or output data to the outside, and a specific interface type thereof may be selected according to specific application requirements, which is not specifically limited herein.
In addition, the storage 22 may be a carrier for storing resources, such as a read-only memory, a random access memory, a magnetic disk or an optical disk, etc., the resources stored thereon may include an operating system 221, a computer program 222, virtual machine data 223, etc., and the virtual machine data 223 may include various data. The storage means may be transient storage or permanent storage.
The operating system 221 is used for managing and controlling each hardware device on the electronic device 20 and the computer program 222, and may be Windows Server, netware, unix, linux, or the like. The computer program 222 may further include a computer program that can be used to perform other specific tasks in addition to the computer program that can be used to perform the message consumption method disclosed in any of the foregoing embodiments and executed by the electronic device 20.
Further, the present application discloses a computer-readable storage medium, wherein the computer-readable storage medium includes a Random Access Memory (RAM), a Memory, a Read-Only Memory (ROM), an electrically programmable ROM, an electrically erasable programmable ROM, a register, a hard disk, a magnetic disk, or an optical disk or any other form of storage medium known in the art. Wherein the computer program when executed by a processor implements the message consumption method disclosed above. For the specific steps of the method, reference may be made to the corresponding contents disclosed in the foregoing embodiments, which are not described herein again.
The embodiments are described in a progressive manner, each embodiment focuses on differences from other embodiments, and the same or similar parts among the embodiments are referred to each other. The device disclosed by the embodiment corresponds to the method disclosed by the embodiment, so that the description is simple, and the relevant points can be referred to the method part for description. Those of skill would further appreciate that the various illustrative elements and algorithm steps described in connection with the embodiments disclosed herein may be implemented as electronic hardware, computer software, or combinations of both, and that the various illustrative components and steps have been described above generally in terms of their functionality in order to clearly illustrate this interchangeability of hardware and software. Whether such functionality is implemented as hardware or software depends upon the particular application and design constraints imposed on the implementation. Skilled artisans may implement the described functionality in varying ways for each particular application, but such implementation decisions should not be interpreted as causing a departure from the scope of the present application.
The steps of a method or algorithm described in connection with the embodiments disclosed herein may be embodied directly in hardware, in a software module executed by a processor, or in a combination of the two. A software module may reside in Random Access Memory (RAM), memory, read-only memory (ROM), electrically programmable ROM, electrically erasable programmable ROM, registers, hard disk, a removable disk, a CD-ROM, or any other form of storage medium known in the art.
Finally, it should also be noted that, herein, relational terms such as first and second, and the like may be used solely to distinguish one entity or action from another entity or action without necessarily requiring or implying any actual such relationship or order between such entities or actions. Also, the terms "comprises," "comprising," or any other variation thereof, are intended to cover a non-exclusive inclusion, such that a process, method, article, or apparatus that comprises a list of elements does not include only those elements but may include other elements not expressly listed or inherent to such process, method, article, or apparatus. Without further limitation, an element defined by the phrase "comprising a … …" does not exclude the presence of another identical element in a process, method, article, or apparatus that comprises the element.
The message consumption method, device, equipment and storage medium provided by the present invention are described in detail above, and a specific example is applied in the present document to explain the principle and the implementation of the present invention, and the description of the above embodiment is only used to help understand the method and the core idea of the present invention; meanwhile, for a person skilled in the art, according to the idea of the present invention, the specific embodiments and the application range may be changed, and in summary, the content of the present specification should not be construed as a limitation to the present invention.

Claims (10)

1. A message consumption method, comprising:
pulling a first target message from a preset message publishing system by using a preset consumer component, and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs; wherein the preset consumer component pulls messages from a plurality of partitions in the preset message publishing system;
controlling the target partition worker component to process the first target message by utilizing a preset service executor component;
after the first target message is processed, marking the running state of the target partition worker component as a finished state, and recording the displacement of the target partition worker component;
when the preset consumer component monitors that the running state of the target partition worker component is a completion state, acquiring the displacement of the target partition worker component, and manually submitting the displacement to the preset message issuing system.
2. The message consumption method of claim 1, wherein the sending the first targeted message to a targeted partition worker component that uniquely corresponds to the targeted partition to which the first targeted message belongs comprises:
determining a message identifier of the first target message, and determining a target partition based on a target subject to which the first target message belongs and a partition identifier in the message identifier;
and sending the first target message to a target partition worker component uniquely corresponding to the target partition.
3. The message consumption method of claim 1, wherein the sending the first targeted message to a targeted partition worker component that uniquely corresponds to the targeted partition to which the first targeted message belongs comprises:
sending the first target message to a message queue of a target partition worker component uniquely corresponding to a target partition to which the first target message belongs;
correspondingly, the controlling the target partition worker component to process the first target message by using a preset service executor component, and after the first target message is processed, marking the running state of the target partition worker component as a finished state, and recording the displacement of the target partition worker component includes:
and controlling the target partition worker component to process the first target message in the message queue by using a preset service executor component, marking the running state of the target partition worker component as a finished state after all messages in the message queue are processed, and recording the displacement of the target partition worker component.
4. The message consumption method of claim 3, wherein the controlling the target partition worker component to process the first target message in the message queue by using a preset business executor component, mark the running state of the target partition worker component as a complete state after all messages in the message queue are processed, and record the displacement of the target partition worker component comprises:
controlling the target partition worker component to call a first method predefined in a preset service executor component to process the messages in the message queue corresponding to the target partition worker component, and monitoring whether unprocessed messages exist in the message queue corresponding to the target partition worker component;
if unprocessed messages do not exist in the message queue corresponding to the target partition worker component, marking the running state of the target partition worker component as a completion state;
and calling a predefined second method in the preset service executor component to call a target function interface, and recording the displacement of the target partition worker component.
5. The message consumption method of claim 1, further comprising:
when the preset consumer component monitors that the running state of the target partition worker component is an unfinished state, acquiring and recording the displacement of the target partition worker component, and prohibiting executing the step of manually submitting the displacement to the preset message issuing system.
6. The message consumption method according to any one of claims 1 to 5, wherein after the step of pulling the first targeted message from the preset message publishing system by using the preset consumer component and sending the first targeted message to the targeted partition worker component uniquely corresponding to the targeted partition to which the first targeted message belongs, the method further comprises:
calling a preset application programming interface in the preset message publishing system to control the preset consumer component to skip the target partition when pulling the message next time so as to pull the message from the next partition of the target partition;
correspondingly, after the first target message processing is finished, the method further comprises the following steps:
and calling a preset application programming interface in the preset message publishing system to control the preset consumer component to pull a second target message from the target partition, and sending the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs.
7. The message consumption method according to claim 6, wherein the controlling the preset consumer component to pull a second target message from the target partition and send the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs comprises:
and determining the next position of the displacement of the target partition worker component in the target partition as a target message pulling position, controlling the preset consumer component to pull a second target message from the target message pulling position, and then sending the second target message to a target partition worker component uniquely corresponding to the target partition to which the second target message belongs.
8. A message consumption apparatus, comprising:
the message pulling module is used for pulling a first target message from a preset message publishing system by using a preset consumer component and sending the first target message to a target partition worker component uniquely corresponding to a target partition to which the first target message belongs; wherein the preset consumer component pulls messages from a plurality of partitions in the preset message publishing system;
the partition worker component working module is used for controlling the target partition worker component to process the first target message by using a preset service executor component, marking the running state of the target partition worker component as a completion state after the first target message is processed, and recording the displacement of the target partition worker component;
and the displacement submitting module is used for acquiring the displacement of the target partition worker component and manually submitting the displacement to the preset message issuing system when the preset consumer component monitors that the running state of the target partition worker component is a finished state.
9. An electronic device comprising a processor and a memory; wherein the processor, when executing the computer program stored in the memory, implements the message consumption method of any of claims 1 to 7.
10. A computer-readable storage medium for storing a computer program; wherein the computer program when executed by a processor implements the message consumption method of any of claims 1 to 7.
CN202211237370.8A 2022-10-11 2022-10-11 Message consumption method, device, equipment and medium Active CN115328664B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202211237370.8A CN115328664B (en) 2022-10-11 2022-10-11 Message consumption method, device, equipment and medium

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202211237370.8A CN115328664B (en) 2022-10-11 2022-10-11 Message consumption method, device, equipment and medium

Publications (2)

Publication Number Publication Date
CN115328664A CN115328664A (en) 2022-11-11
CN115328664B true CN115328664B (en) 2023-03-24

Family

ID=83914829

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202211237370.8A Active CN115328664B (en) 2022-10-11 2022-10-11 Message consumption method, device, equipment and medium

Country Status (1)

Country Link
CN (1) CN115328664B (en)

Families Citing this family (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN117290122B (en) * 2023-02-28 2025-02-25 北京荣大科技股份有限公司 A method for orderly production and consumption in multiple environments based on Kafka
CN116643870B (en) * 2023-07-24 2023-11-10 北方健康医疗大数据科技有限公司 Method, system and device for processing long-time task distribution and readable storage medium

Citations (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US20180217882A1 (en) * 2017-01-27 2018-08-02 International Business Machines Corporation Batch processing of messages
CN111078422A (en) * 2019-11-19 2020-04-28 泰康保险集团股份有限公司 Message processing method, message processing device, readable storage medium and electronic equipment

Family Cites Families (3)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN108874562B (en) * 2018-06-21 2022-08-02 北京顺丰同城科技有限公司 Distributed high-concurrency message queue pushing system
US10990459B2 (en) * 2019-08-30 2021-04-27 Chicago Mercantile Exchange Inc. Distributed threaded streaming platform reader
CN111314422A (en) * 2020-01-17 2020-06-19 平安医疗健康管理股份有限公司 Kafka-based message processing method and system, storage medium and computer equipment

Patent Citations (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US20180217882A1 (en) * 2017-01-27 2018-08-02 International Business Machines Corporation Batch processing of messages
CN111078422A (en) * 2019-11-19 2020-04-28 泰康保险集团股份有限公司 Message processing method, message processing device, readable storage medium and electronic equipment

Also Published As

Publication number Publication date
CN115328664A (en) 2022-11-11

Similar Documents

Publication Publication Date Title
CN107729139B (en) Method and device for concurrently acquiring resources
CN115328664B (en) Message consumption method, device, equipment and medium
CN110716793B (en) Method, device, equipment and storage medium for executing distributed transaction
US20170346782A1 (en) Monitoring of subscriber message processing in a publish/subscribe messaging environment
US20100287553A1 (en) System, method, and software for controlled interruption of batch job processing
US20120072579A1 (en) Monitoring cloud-runtime operations
US8538793B2 (en) System and method for managing real-time batch workflows
CN106844198A (en) A distributed scheduling automation test platform and method
CN109656725B (en) Message consumer switching method and device, storage medium and electronic equipment
CN111274052A (en) Data distribution method, server, and computer-readable storage medium
CN113032125A (en) Job scheduling method, device, computer system and computer-readable storage medium
US9229794B1 (en) Signaling service interface module
CN108008950B (en) Method and device for realizing user interface updating
CN111858007A (en) Method and device for task scheduling based on message middleware
CN114900449B (en) Resource information management method, system and device
CN101176068B (en) Apparatus, system, and method for facilitating communication between an enterprise information system and a customer
CN111176577A (en) Distributed block storage service command processing method, device, equipment and medium
CN114675940A (en) Application instance construction method, device and equipment
US9607275B2 (en) Method and system for integration of systems management with project and portfolio management
CN111897643B (en) Thread pool configuration system, method, device and storage medium
EP4024761A1 (en) Communication method and apparatus for multiple management domains
CN112416980A (en) Data service processing method, device and equipment
JP4516594B2 (en) Message transmission control method, message transmission control device, and message transmission control program
CN115984022B (en) Unified account checking method and device for distributed payment system
CN107911442B (en) Receiving response interface interaction method and device, computer equipment and storage medium

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant