Java – Flink streaming: how to output a data stream to different outputs according to data?
In Apache Flink, I have a string of tuples Let's assume a very simple tuple1 < string > Tuples can have any value in their value field (for example, "P1", "P2", etc.) A set of possible values is limited, but I don't know all the preset values (so there may be a 'p362') I want to write the tuple to an output location, depending on the value in the tuple For example, I want to have the following file structure:
> / output / P1 > / output / P2
In the document, I only find the possibility of writing to the location I know in advance (for example, stream.writecsv ("/ output / here")), but there is no way to let the content of the data determine the location where the data actually ends
I read about output segmentation in documents, but it doesn't seem to provide a way to redirect output to different destinations. I want to have it (or I don't understand how this will work)
Can this be done using the Flink API? If not, is it possible for a third-party library to do this, or do I have to build one myself?
to update
According to Matthias' suggestion, I came up with a filter receiving function, which determines the output path, and then writes the tuple to the corresponding file after serialization I put it here for reference. Maybe it is useful to others:
public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> { private final OutputSelector<IT> outputSelector; private final MapFunction<IT,String> serializationFunction; private final String basePath; Map<String,TextOutputFormat<String>> formats = new HashMap<>(); /** * @param outputSelector the selector which determines into which output(s) a record is written. * @param serializationFunction a function which serializes the record to a string. * @param basePath the base path for writing the records. It will be appended with the output selector. */ public SiftingSinkFunction(OutputSelector<IT> outputSelector,MapFunction<IT,String> serializationFunction,String basePath) { this.outputSelector = outputSelector; this.serializationFunction = serializationFunction; this.basePath = basePath; } @Override public void invoke(IT value) throws Exception { // find out where to write. Iterable<String> selection = outputSelector.select(value); for (String s : selection) { // ensure we have a format for this. TextOutputFormat<String> destination = ensureDestinationExists(s); // then serialize and write. destination.writeRecord(serializationFunction.map(value)); } } private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException { // if we kNow the destination,we just return the format. if (formats.containsKey(selection)) { return formats.get(selection); } // create a new output format and initialize it from the context. TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath,selection)); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); format.configure(context.getTaskStubParameters()); format.open(context.getIndexOfThisSubtask(),context.getNumberOfParallelSubtasks()); // put it into our map. formats.put(selection,format); return format; } @Override public void close() throws IOException { Exception lastException = null; try { for (TextOutputFormat<String> format : formats.values()) { try { format.close(); } catch (Exception e) { lastException = e; format.tryCleanupOnError(); } } } finally { formats.clear(); } if (lastException != null) { throw new IOException("Close Failed.",lastException); } } }
Solution
You can implement a custom receiver Inherit from one of the two:
> org. apache. flink. streaming. api. functions. sink. SinkFunction > org. apache. flink. streaming. api. functions. sink. RichSinkFunction
Use in your program:
stream.addSink(SinkFunction<T> sinkFunction);
Not stream writeCsv(“/ output / athere”).