Java – build custom connection logic in cascade to ensure only map_ SIDE

I have three cascaded pipes (one connected to the other two), as described below,

>Lhspipe – (larger size)

>Rhspipes – (may be suitable for smaller sizes of memory)

Psuedocode is as follows. This example involves two connections

If f1decidingfactor = yes, use RHS lookup #1 by (lhspipe. F1input = RHS lookup #1. Join #f1) to add lhspipe and set the lookup result (set lhspipe. F1output = result #f1). Otherwise, set lhspipe F1Output = N / A

The same logic applies to F2 calculation

Expected production,

If-else decides whether to join or not, which forces me to enter the custom join operation

Considering the above situation, I want to join map-side (keep rhspipe in the memory of map task node). I am considering the following possible solutions, each of which has its advantages and disadvantages I need your advice on these

Option 1:

Cogroup – we can use cogroup with bufferjoiner and custom connection (operation) to build custom connection logic, but we cannot ensure map-side connection

Option 2:

Hashjoin – it ensures that map-side is joined, but as far as I can see, it is not possible to build a custom connection in this way

Please correct my understanding and put forward your comments to deal with this request

Thank you in advance

Solution

The best way to solve this problem (I can think of) is to modify your smaller data set You can add a new field (f1decidingfactor) to a smaller dataset The value of f1result can be as follows:

Sudo code

if F1DecidingFactor == "Yes" then
    F1Result = ACTUAL_VALUE
else
    F1Result = "N/A"

Result table

|F1#Join|F1#Result|F1#DecidingFactor|
|    Yes|        0|             True|
|    Yes|        1|            False|
|     No|        0|              N/A|
|     No|        1|              N/A|

You can also do this by cascading

After that, you can join the map side

If it is impossible to modify a smaller dataset, I have two options to solve the problem

Option 1

Add a new field to your small pipeline, which is equivalent to the factor you decide (i.e. f1decidingfactor_rhs = yes) Then include it in your join condition Once your join is complete, you will only have the values of those rows that match these criteria Otherwise, it will be empty / blank Example code:

Main class

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTestOption2 {
    public StackHashJoinTestOption2() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields f1DecidingFactor = new Fields("F1DecidingFactor");
        Fields f2DecidingFactor = new Fields("F2DecidingFactor");
        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");

        Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);

        Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);

        Fields functionFields = new Fields("F1DecidingFactor","F1Output","F2DecidingFactor","F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // New field to small pipe. Expected Fields:
        // F1Join F1Result F1DecidingFactor_RHS
        rhsOne = new Each(rhsOne,new Insert(f1DecidingFactorRhs,"Yes"),Fields.ALL);

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // New field to small pipe. Expected Fields:
        // F2Join F2Result F2DecidingFactor_RHS
        rhsTwo = new Each(rhsTwo,Fields.ALL);

        // Joining first small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
        Pipe resultsOne = new HashJoin(largePipe,lhsJoinerOne,rhsOne,rhsJoinerOne,new LeftJoin());

        // Joining second small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
        Pipe resultsTwo = new HashJoin(resultsOne,lhsJoinerTwo,rhsTwo,rhsJoinerTwo,new LeftJoin());

        Pipe result = new Each(resultsTwo,functionFields,new TestFunction(),Fields.REPLACE);

        result = new Discard(result,f1DecidingFactorRhs);
        result = new Discard(result,f2DecidingFactorRhs);

        // result Pipe should have expected result
    }
}

Option 2

If you want to use the default value instead of null / blank, it is recommended to hashjoin with the default joiners first, and then use a function to update the tuple with the appropriate value It's like:

Main class

import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTest {
    public StackHashJointest() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields functionFields = new Fields("F1DecidingFactor","F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // Joining first small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
        Pipe resultsOne = new HashJoin(largePipe,f1Input,f1Join,new LeftJoin());

        // Joining second small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
        Pipe resultsTwo = new HashJoin(resultsOne,f2Input,f2Join,Fields.REPLACE);

        // result Pipe should have expected result
    }
}

Update function

import cascading.flow.FlowProcess;
import cascading.operation.BaSEOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class TestFunction extends BaSEOperation<Void> implements Function<Void> {

    private static final long serialVersionUID = 1L;

    private static final String DECIDING_FACTOR = "No";
    private static final String DEFAULT_VALUE = "N/A";

    // Expected Fields: "F1DecidingFactor","F2Output"
    public TestFunction() {
        super(Fields.ARGS);
    }

    @Override
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process,FunctionCall<Void> call) {
        TupleEntry arguments = call.getArguments();

        TupleEntry result = new TupleEntry(arguments);

        if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F1Output",DEFAULT_VALUE);
        }

        if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F2Output",DEFAULT_VALUE);
        }

        call.getOutputCollector().add(result);
    }

}

reference resources

> Insert Function > Custom Function > HashJoin

This should solve your problem Let me know if it helps

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>