Java – Apache beam – integration test of unbounded pcollection

We are building integration tests for the Apache beam pipeline and have encountered some problems For background information, see

Details of our pipeline:

>We use pubsubio as our data source (unbounded pcollection) > intermediate transformation includes custom combinefn and very simple window / trigger strategy > our final transformation is jdbcio, using org neo4j. jdbc. Driver writes neo4j

Current test methods:

>Run Google cloud's publish / subscribe simulator on the computer running the test > build the in memory neo4j database and pass its URI to our pipeline Options > by calling ourpipeline Mainrun the pipeline (testpipeline. Converttoargs (options) > use Google cloud's Java pub / sub client library to publish messages to the test topic (using the pub / sub emulator), from which pubsubio will read > data should flow through the pipeline and eventually hit our neo4j memory instance > make a simple assertion about the existence of these data in neo4j

This is a simple integration test that will verify that our entire pipeline is running as expected

The problem we face now is that when we run our pipe, it will block We are using directrunner and pipeline Run() (instead of pipeline. Run() Waituntilfinish()), but the test seems to hang after running the pipe Because this is an unrestricted pcollection (running in stream mode), the pipeline will not terminate and therefore will not reach any code

So I have a few questions:

1) Is there any way to run the pipeline and then manually stop it later?

2) Is there any way to run the pipeline asynchronously? Ideally, it will start the pipeline (and then continue polling pub / sub for data) and go to the code responsible for publishing to Pub / sub

3) Is this integration testing method reasonable, or is there a better method that may be more direct? Any information / guidance here will be appreciated

If I can provide any additional code / background, please let me know – thank you!

Solution

You can use directrunner to run pipes asynchronously by setting the isblockonrun pipeline option to false As long as you keep a reference to the pipelineresult that can be returned, calling cancel () on the result should stop the pipeline

For the third question, your setting seems reasonable However, if you want to test the pipeline on a smaller scale (requiring fewer components), you can encapsulate all processing logic in a custom ptransform This ptransform shall take inputs that have been fully parsed from the input source and generate outputs that have not been parsed for the output receiver

When doing this, you can use create (which generally does not exercise trigger) or teststream (which may depend on how you build teststream) to generate a limited amount of input data with directrunner, ptransform this process to pcollection, and use passert on the output pcollection to verify whether the pipeline generates the output you expect

For more information about testing, the beam website provides information about these test styles using teststream to test pipelines in the programming guide and blog post

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>