Java – Flink streaming: how to output a data stream to different outputs according to data?

In Apache Flink, I have a string of tuples Let's assume a very simple tuple1 < string > Tuples can have any value in their value field (for example, "P1", "P2", etc.) A set of possible values is limited, but I don't know all the preset values (so there may be a 'p362') I want to write the tuple to an output location, depending on the value in the tuple For example, I want to have the following file structure:

> / output / P1 > / output / P2

In the document, I only find the possibility of writing to the location I know in advance (for example, stream.writecsv ("/ output / here")), but there is no way to let the content of the data determine the location where the data actually ends

I read about output segmentation in documents, but it doesn't seem to provide a way to redirect output to different destinations. I want to have it (or I don't understand how this will work)

Can this be done using the Flink API? If not, is it possible for a third-party library to do this, or do I have to build one myself?

to update

According to Matthias' suggestion, I came up with a filter receiving function, which determines the output path, and then writes the tuple to the corresponding file after serialization I put it here for reference. Maybe it is useful to others:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {

    private final OutputSelector<IT> outputSelector;
    private final MapFunction<IT,String> serializationFunction;
    private final String basePath;
    Map<String,TextOutputFormat<String>> formats = new HashMap<>();

    /**
     * @param outputSelector        the selector which determines into which output(s) a record is written.
     * @param serializationFunction a function which serializes the record to a string.
     * @param basePath              the base path for writing the records. It will be appended with the output selector.
     */
    public SiftingSinkFunction(OutputSelector<IT> outputSelector,MapFunction<IT,String> serializationFunction,String basePath) {
        this.outputSelector = outputSelector;
        this.serializationFunction = serializationFunction;
        this.basePath = basePath;
    }


    @Override
    public void invoke(IT value) throws Exception {
        // find out where to write.
        Iterable<String> selection = outputSelector.select(value);
        for (String s : selection) {
            // ensure we have a format for this.
            TextOutputFormat<String> destination = ensureDestinationExists(s);
            // then serialize and write.
            destination.writeRecord(serializationFunction.map(value));
        }
    }

    private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException {
        // if we kNow the destination,we just return the format.
        if (formats.containsKey(selection)) {
            return formats.get(selection);
        }

        // create a new output format and initialize it from the context.
        TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath,selection));
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
        format.configure(context.getTaskStubParameters());
        format.open(context.getIndexOfThisSubtask(),context.getNumberOfParallelSubtasks());

        // put it into our map.
        formats.put(selection,format);
        return format;
    }

    @Override
    public void close() throws IOException {
        Exception lastException = null;
        try {
            for (TextOutputFormat<String> format : formats.values()) {
                try {
                    format.close();
                } catch (Exception e) {
                    lastException = e;
                    format.tryCleanupOnError();
                }
            }
        } finally {
            formats.clear();
        }

        if (lastException != null) {
            throw new IOException("Close Failed.",lastException);
        }
    }
}

Solution

You can implement a custom receiver Inherit from one of the two:

> org. apache. flink. streaming. api. functions. sink. SinkFunction > org. apache. flink. streaming. api. functions. sink. RichSinkFunction

Use in your program:

stream.addSink(SinkFunction<T> sinkFunction);

Not stream writeCsv(“/ output / athere”).

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>