Akka distribution pub sub: Java implementation does not work
Subscriber's main class: application java
package com.mynamespace; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ComponentScan; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.contrib.pattern.DistributedPubSubExtension; import akka.contrib.pattern.DistributedPubSubMediator; import com.mynamespace.actors.SubscriberActor; @SpringBootApplication @ComponentScan(basePackages = "com.mynamespace.*") public class Application { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(Application.class,args); // get hold of the actor system ActorSystem system = ctx.getBean(ActorSystem.class); ActorRef mediator = DistributedPubSubExtension.get(system).mediator(); ActorRef subscriber = system.actorOf( Props.create(SubscriberActor.class),"subscriber"); // subscribe to the topic named "content" mediator.tell(new DistributedPubSubMediator.Put(subscriber),subscriber); // subscriber.tell("init",null); System.out.println("Running."); Thread.sleep(5000l); } }
Subscriber actor: subscriberactor java
package com.mynamespace.actors; import java.util.ArrayList; import java.util.List; import akka.actor.UntypedActor; import com.mynamespace.message.CategoryServiceRequest; import com.mynamespace.message.CategoryServiceResponse; public class SubscriberActor extends UntypedActor { @Override public void onReceive(Object msg) throws Exception { if (msg instanceof CategoryServiceRequest) { System.out.println("Request received for GetCategories."); CategoryServiceResponse response = new CategoryServiceResponse(); List<String> categories = new ArrayList<>(); categories.add("Food"); categories.add("Fruits"); response.setCatgories(categories); getSender().tell(response,getSelf()); } else if (msg instanceof String && msg.equals("init")) { System.out.println("init called"); } else { System.out .println("Unhandelled message received for getCategories."); } } }
Subscriber's application conf
akka { loglevel = INFO stdout-loglevel = INFO loggers = ["akka.event.slf4j.Slf4jLogger"] extensions = ["akka.contrib.pattern.DistributedPubSubExtension"] actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://mynamespace-actor-system@127.0.0.1:2551","akka.tcp://mynamespace-actor-system@127.0.0.1:2552"] auto-down-unreachable-after = 10s } }
Main class of Publisher: application java
package com.mynamespace; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ComponentScan; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.contrib.pattern.DistributedPubSubExtension; import akka.contrib.pattern.DistributedPubSubMediator; import com.mynamespace.actors.PublisherActor; @SpringBootApplication @ComponentScan(basePackages = "com.mynamespace.*") public class Application { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(Application.class,args); // get hold of the actor system ActorSystem system = ctx.getBean(ActorSystem.class); ActorRef mediator = DistributedPubSubExtension.get(system).mediator(); ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),"publisher"); mediator.tell(new DistributedPubSubMediator.Put(publisher),publisher); Thread.sleep(5000); publisher.tell("hi",publisher); System.out.println("Running."); } }
PublisherActor. java
package com.mynamespace.actors; import scala.concurrent.Future; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.contrib.pattern.DistributedPubSubExtension; import akka.contrib.pattern.DistributedPubSubMediator; import akka.dispatch.Mapper; import akka.pattern.Patterns; import akka.util.Timeout; import com.mynamespace.message.CategoryServiceRequest; import com.mynamespace.message.CategoryServiceResponse; public class PublisherActor extends UntypedActor { // activate the extension ActorRef mediator = DistributedPubSubExtension.get(getContext().system()) .mediator(); public void onReceive(Object msg) { if (msg instanceof String) { Timeout timeOut = new Timeout(50000l); mediator.tell(new DistributedPubSubMediator.Send( "/user/subscriber",new CategoryServiceRequest()),getSelf()); Future<Object> response = Patterns.ask(mediator,new DistributedPubSubMediator.Send("/user/subscriber",timeOut); Future<CategoryServiceResponse> finalresponse = response.map( new Mapper<Object,CategoryServiceResponse>() { @Override public CategoryServiceResponse apply(Object parameter) { CategoryServiceResponse responseFromRemote = (CategoryServiceResponse) parameter; System.out.println("received:: list of size:: " + responseFromRemote.getCatgories().size()); return responseFromRemote; } },getContext().system().dispatcher()); } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { System.out.println("subscribbed......."); } else { unhandled(msg); } } }
The publisher's application configuration is the same as that of the subscriber Both run on different ports on the same system
I have defined and run two seed nodes on the local system Somehow, I can't ask / tell subscribers from producers (both running on different nodes) through distributedpubsub mediator
Run subscriber and then Publisher: I didn't print any exceptions or any dead letter references in stdout / logs
Is it possible to see which actor cited my mediator?
Need help finding problems or possible problems
Solution
I encountered the same problem. After @ spam's comments and my own experiments, I can recommend using publish / subscribe group and sendonemessagetoeachgroup = true
Should it only be sent locally? If so, the document does not specify But I can also tell you by code that this particular part of the document is obviously ignored (because the class name is changed, but it is not invoked, the previous ones are invoked in the previous example).
I hope this will help anyone with this problem, because the document is obviously a little misleading