Java – build custom connection logic in cascade to ensure only map_ SIDE
I have three cascaded pipes (one connected to the other two), as described below,
>Lhspipe – (larger size)
>Rhspipes – (may be suitable for smaller sizes of memory)
Psuedocode is as follows. This example involves two connections
If f1decidingfactor = yes, use RHS lookup #1 by (lhspipe. F1input = RHS lookup #1. Join #f1) to add lhspipe and set the lookup result (set lhspipe. F1output = result #f1). Otherwise, set lhspipe F1Output = N / A
The same logic applies to F2 calculation
Expected production,
If-else decides whether to join or not, which forces me to enter the custom join operation
Considering the above situation, I want to join map-side (keep rhspipe in the memory of map task node). I am considering the following possible solutions, each of which has its advantages and disadvantages I need your advice on these
Option 1:
Cogroup – we can use cogroup with bufferjoiner and custom connection (operation) to build custom connection logic, but we cannot ensure map-side connection
Option 2:
Hashjoin – it ensures that map-side is joined, but as far as I can see, it is not possible to build a custom connection in this way
Please correct my understanding and put forward your comments to deal with this request
Thank you in advance
Solution
The best way to solve this problem (I can think of) is to modify your smaller data set You can add a new field (f1decidingfactor) to a smaller dataset The value of f1result can be as follows:
Sudo code
if F1DecidingFactor == "Yes" then F1Result = ACTUAL_VALUE else F1Result = "N/A"
Result table
|F1#Join|F1#Result|F1#DecidingFactor| | Yes| 0| True| | Yes| 1| False| | No| 0| N/A| | No| 1| N/A|
You can also do this by cascading
After that, you can join the map side
If it is impossible to modify a smaller dataset, I have two options to solve the problem
Option 1
Add a new field to your small pipeline, which is equivalent to the factor you decide (i.e. f1decidingfactor_rhs = yes) Then include it in your join condition Once your join is complete, you will only have the values of those rows that match these criteria Otherwise, it will be empty / blank Example code:
Main class
import cascading.operation.Insert; import cascading.pipe.Each; import cascading.pipe.HashJoin; import cascading.pipe.Pipe; import cascading.pipe.assembly.Discard; import cascading.pipe.joiner.LeftJoin; import cascading.tuple.Fields; public class StackHashJoinTestOption2 { public StackHashJoinTestOption2() { Fields f1Input = new Fields("F1Input"); Fields f2Input = new Fields("F2Input"); Fields f1Join = new Fields("F1Join"); Fields f2Join = new Fields("F2Join"); Fields f1DecidingFactor = new Fields("F1DecidingFactor"); Fields f2DecidingFactor = new Fields("F2DecidingFactor"); Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS"); Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS"); Fields lhsJoinerOne = f1DecidingFactor.append(f1Input); Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input); Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join); Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join); Fields functionFields = new Fields("F1DecidingFactor","F1Output","F2DecidingFactor","F2Output"); // Large Pipe fields : // F1DecidingFactor F1Input F2DecidingFactor F2Input Pipe largePipe = new Pipe("large-pipe"); // Small Pipe 1 Fields : // F1Join F1Result Pipe rhsOne = new Pipe("small-pipe-1"); // New field to small pipe. Expected Fields: // F1Join F1Result F1DecidingFactor_RHS rhsOne = new Each(rhsOne,new Insert(f1DecidingFactorRhs,"Yes"),Fields.ALL); // Small Pipe 2 Fields : // F2Join F2Result Pipe rhsTwo = new Pipe("small-pipe-2"); // New field to small pipe. Expected Fields: // F2Join F2Result F2DecidingFactor_RHS rhsTwo = new Each(rhsTwo,Fields.ALL); // Joining first small pipe. Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS Pipe resultsOne = new HashJoin(largePipe,lhsJoinerOne,rhsOne,rhsJoinerOne,new LeftJoin()); // Joining second small pipe. Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS Pipe resultsTwo = new HashJoin(resultsOne,lhsJoinerTwo,rhsTwo,rhsJoinerTwo,new LeftJoin()); Pipe result = new Each(resultsTwo,functionFields,new TestFunction(),Fields.REPLACE); result = new Discard(result,f1DecidingFactorRhs); result = new Discard(result,f2DecidingFactorRhs); // result Pipe should have expected result } }
Option 2
If you want to use the default value instead of null / blank, it is recommended to hashjoin with the default joiners first, and then use a function to update the tuple with the appropriate value It's like:
Main class
import cascading.pipe.Each; import cascading.pipe.HashJoin; import cascading.pipe.Pipe; import cascading.pipe.joiner.LeftJoin; import cascading.tuple.Fields; public class StackHashJoinTest { public StackHashJointest() { Fields f1Input = new Fields("F1Input"); Fields f2Input = new Fields("F2Input"); Fields f1Join = new Fields("F1Join"); Fields f2Join = new Fields("F2Join"); Fields functionFields = new Fields("F1DecidingFactor","F2Output"); // Large Pipe fields : // F1DecidingFactor F1Input F2DecidingFactor F2Input Pipe largePipe = new Pipe("large-pipe"); // Small Pipe 1 Fields : // F1Join F1Result Pipe rhsOne = new Pipe("small-pipe-1"); // Small Pipe 2 Fields : // F2Join F2Result Pipe rhsTwo = new Pipe("small-pipe-2"); // Joining first small pipe. // Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result Pipe resultsOne = new HashJoin(largePipe,f1Input,f1Join,new LeftJoin()); // Joining second small pipe. // Expected fields after join: // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result Pipe resultsTwo = new HashJoin(resultsOne,f2Input,f2Join,Fields.REPLACE); // result Pipe should have expected result } }
Update function
import cascading.flow.FlowProcess; import cascading.operation.BaSEOperation; import cascading.operation.Function; import cascading.operation.FunctionCall; import cascading.tuple.Fields; import cascading.tuple.TupleEntry; public class TestFunction extends BaSEOperation<Void> implements Function<Void> { private static final long serialVersionUID = 1L; private static final String DECIDING_FACTOR = "No"; private static final String DEFAULT_VALUE = "N/A"; // Expected Fields: "F1DecidingFactor","F2Output" public TestFunction() { super(Fields.ARGS); } @Override public void operate(@SuppressWarnings("rawtypes") FlowProcess process,FunctionCall<Void> call) { TupleEntry arguments = call.getArguments(); TupleEntry result = new TupleEntry(arguments); if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { result.setString("F1Output",DEFAULT_VALUE); } if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { result.setString("F2Output",DEFAULT_VALUE); } call.getOutputCollector().add(result); } }
reference resources
> Insert Function > Custom Function > HashJoin
This should solve your problem Let me know if it helps