Correctly handle null observable in rxjava

I have a situation where I'm creating an observable that contains database results I'm applying a series of filters to them Then I have a subscriber to record the results When possible, no element passes through the filter My business logic shows that this is not a mistake However, when this happens, my onerror is called and contains the following exception: Java util. NoSuchElementException: sequence does not contain an element

Is it acceptable to detect that type of exception and ignore it? Or is there a better way to deal with it?

The version is 1.0 0

This is a simple test case that exposes what I see It seems to be related to making all events filtered and reduced before reaching the map

@Test
public void test()
{

    Integer values[] = new Integer[]{1,2,3,4,5};

    Observable.from(values).filter(new Func1<Integer,Boolean>()
    {
        @Override
        public Boolean call(Integer integer)
        {
            if (integer < 0)
                return true;
            else
                return false;
        }
    }).map(new Func1<Integer,String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).reduce(new Func2<String,String,String>()
    {
        @Override
        public String call(String s,String s2)
        {
            return s + "," + s2;
        }
    })

            .subscribe(new Action1<String>()
            {
                @Override
                public void call(String s)
                {
                    System.out.println(s);
                }
            });
}

Because I am using a secure user, it first throws an onerrornotimplementedexception, which contains the following exceptions:

java.util.NoSuchElementException: Sequence contains no elements
    at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
    at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
    at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
    at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
    at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
    at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
    at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
    at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
    at rx.Subscriber.setProducer(Subscriber.java:139)
    at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
    at rx.Subscriber.setProducer(Subscriber.java:133)
    at rx.Subscriber.setProducer(Subscriber.java:133)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable.subscribe(Observable.java:7284)

Based on the following @davem answers, I created a new test case:

@Test
public void testFromBlockingAndSingle()
{

    Integer values[] = new Integer[]{-2,-1,1,5};

    List<String> results = Observable.from(values).filter(new Func1<Integer," + s2;
        }
    }).toList().toBlocking().single();

    System.out.println("Test: " + results + " Size: " + results.size());

}

This test produces the following behavior:

When the input is:

Integer values[] = new Integer[]{-2,5};

Then the result (as expected) is:

Test: [-2,-1] Size: 1

When the input is:

Integer values[] = new Integer[]{0,5};

The result is the following stack trace:

java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

So it seems that the problem must be using the reduce function See test cases for the following two cases:

@Test
public void testNoReduce()
{

    Integer values[] = new Integer[]{-2,String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).toList().toBlocking().first();

    Iterator<String> itr = results.iterator();
    StringBuilder b = new StringBuilder();

    while (itr.hasNext())
    {
        b.append(itr.next());

        if (itr.hasNext())
            b.append(",");
    }

    System.out.println("Test NoReduce: " + b);

}

Enter by:

Integer values[] = new Integer[]{-2,5};

I got the following results:

Test NoReduce: -2,-1

And enter the following:

Integer values[] = new Integer[]{0,5};

I get the following output:

Test NoReduce:

Therefore, unless I completely misunderstand something, the only way to really deal with zero elements comes from the observable of the filter, followed by a map and reduce, which implements the reduce logic outside the observable chain Do you all agree with this statement?

Final solution

Implementing Tom á š Dvo ř After the suggestion of á K and David motten, this is my final solution I think this solution is reasonable

@Test
public void testWithToList()
{

    Integer values[] = new Integer[]{-2,5};

     Observable.from(values).filter(new Func1<Integer,Boolean>()
     {
         @Override
         public Boolean call(Integer integer)
         {
             if (integer < 0)
                 return true;
             else
                 return false;
         }
     }).toList().map(new Func1<List<Integer>,String>()
     {
         @Override
         public String call(List<Integer> integers)
         {
             Iterator<Integer> intItr = integers.iterator();
             StringBuilder b = new StringBuilder();

             while (intItr.hasNext())
             {
                 b.append(intItr.next());

                 if (intItr.hasNext())
                 {
                     b.append(",");
                 }
             }

             return b.toString();
         }
     }).subscribe(new Action1<String>()
     {
         @Override
         public void call(String s)
         {
             System.out.println("With a toList: " + s);
         }
     });

}

The following is the behavior of the test when the following inputs are given

When a stream with certain values is given to pass through the filter:

Integer values[] = new Integer[]{-2,5};

The result is:

With a toList: -2,-1

When a stream without any value is given to pass through the filter:

Integer values[] = new Integer[]{0,5};

The result is:

With a toList: <empty string>

Solution

Now after the update, the error is obvious The reduction in rxjava will fail and the illegalargumentexception will be null if the reduction can be observed, as per the specification( http://reactivex.io/documentation/operators/reduce.html )As shown in

In functional programming, there are usually two general operators to aggregate a set into a single value, collapse and shrink In accepted terms, fold takes the initial accumulator value and uses a function that runs the accumulator and the value from the set and produces another accumulator value An example of pseudo code:

[1,4] . Fold (0, (accumulator, value) = > accumulator value)

Start with 0, add 1 and 4 to the running accumulator, and finally get 10, the sum of these values

The reduction is very similar, except that the initial accumulator value is not explicitly used. It uses the first value as the initial accumulator, and then accumulates all the remaining values This makes sense if you are, for example, looking for a minimum or maximum

[1,4] . Reduce ((cumulative, value) = > min (accumulator, value))

Looking at the different ways of folding and reducing, you may use a compromise. Even in an empty set (e.g., sum, 0 is meaningful), the aggregate value is meaningful and reduced (the minimum is meaningless to the empty set and reduces the will. In this case, it cannot operate, and an exception is thrown in your case)

You are doing a similar aggregation, separating a set of strings with commas to produce a single string That is a somewhat difficult situation It may be that an empty set (you might expect an empty string) makes sense, but on the other hand, if you start with an empty accumulator, there will be more commas in the result than you expect The correct solution is to check whether the collection is empty and return the fallback string of the empty collection, or reduce the non empty collection You may observe that you usually do not need to use empty strings in empty collection cases, but "collection empty" may be more appropriate, so further ensure that your solution is clean

BTW, I use this word here instead of observing freely, just for educational purposes In addition, in rxjava, both fold and reduce are called the same. Only this method has two versions. One uses only one parameter and the other two parameters

As for your last question: you don't have to leave the observable chain Just use tolist (), as David motten said

.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(","))

Prose can be achieved in terms of reduction, if not library function (very common)

collection.intersperse(separator) = 
    if (collection.isEmpty()) 
      ""
    else
      collection.reduce(accumulator,element => accumulator + separator + element)
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>