Java – Apache beam – integration test of unbounded pcollection
We are building integration tests for the Apache beam pipeline and have encountered some problems For background information, see
Details of our pipeline:
>We use pubsubio as our data source (unbounded pcollection) > intermediate transformation includes custom combinefn and very simple window / trigger strategy > our final transformation is jdbcio, using org neo4j. jdbc. Driver writes neo4j
Current test methods:
>Run Google cloud's publish / subscribe simulator on the computer running the test > build the in memory neo4j database and pass its URI to our pipeline Options > by calling ourpipeline Mainrun the pipeline (testpipeline. Converttoargs (options) > use Google cloud's Java pub / sub client library to publish messages to the test topic (using the pub / sub emulator), from which pubsubio will read > data should flow through the pipeline and eventually hit our neo4j memory instance > make a simple assertion about the existence of these data in neo4j
This is a simple integration test that will verify that our entire pipeline is running as expected
The problem we face now is that when we run our pipe, it will block We are using directrunner and pipeline Run() (instead of pipeline. Run() Waituntilfinish()), but the test seems to hang after running the pipe Because this is an unrestricted pcollection (running in stream mode), the pipeline will not terminate and therefore will not reach any code
So I have a few questions:
1) Is there any way to run the pipeline and then manually stop it later?
2) Is there any way to run the pipeline asynchronously? Ideally, it will start the pipeline (and then continue polling pub / sub for data) and go to the code responsible for publishing to Pub / sub
3) Is this integration testing method reasonable, or is there a better method that may be more direct? Any information / guidance here will be appreciated
If I can provide any additional code / background, please let me know – thank you!
Solution
You can use directrunner to run pipes asynchronously by setting the isblockonrun pipeline option to false As long as you keep a reference to the pipelineresult that can be returned, calling cancel () on the result should stop the pipeline
For the third question, your setting seems reasonable However, if you want to test the pipeline on a smaller scale (requiring fewer components), you can encapsulate all processing logic in a custom ptransform This ptransform shall take inputs that have been fully parsed from the input source and generate outputs that have not been parsed for the output receiver
When doing this, you can use create (which generally does not exercise trigger) or teststream (which may depend on how you build teststream) to generate a limited amount of input data with directrunner, ptransform this process to pcollection, and use passert on the output pcollection to verify whether the pipeline generates the output you expect
For more information about testing, the beam website provides information about these test styles using teststream to test pipelines in the programming guide and blog post