Florian Hopf / @fhopf

Ein Beispiel:

Einfacher Web Crawler in Java

Sequentiell

SequentialExecution.java


public void downloadAndIndex(String path, IndexWriter writer) {
    Indexer indexer = new IndexerImpl(writer);
    PageRetriever retriever = new HtmlParserPageRetriever(path);
        
    VisitedPageStore pageStore = new VisitedPageStore();
    pageStore.add(path);
    
    String page;
    while ((page = pageStore.getNext()) != null) {
        PageContent pageContent = retriever.fetchPageContent(page);
        pageStore.addAll(pageContent.getLinksToFollow());
        indexer.index(pageContent);
        pageStore.finished(page);
    }
        
    indexer.commit();
}
					

Zeitmessung


5404 [main] INFO de.fhopf.akka.Executor - Found 77 results
                    

(Kein echtes Benchmarking)

Scale Up

Synchronize and Suffer

Scala

Actor-Toolkit

Schon wieder ein neuer Hype?

A universal modular ACTOR formalism for artificial intelligence

IJCAI'73 Proceedings of the 3rd international joint conference on Artificial intelligence

Anspieltipp: Bret Victor - The Future of Programming

Message Passing Concurrency

SimpleActorExecution.java


ActorSystem actorSystem = ActorSystem.create();
ActorRef master = actorSystem.actorOf(Props.create(SimpleActorMaster.class, 
                new HtmlParserPageRetriever(path), writer));

master.tell(path, actorSystem.guardian());

actorSystem.awaitTermination();
                    

SimpleActorMaster.java


class SimpleActorMaster extends UntypedActor {

    private final ActorRef indexer;
    private final ActorRef parser;

    public SimpleActorMaster(final PageRetriever pageRetriever, 
                            final IndexWriter indexWriter) {
       this.parser = getContext().actorOf(Props.create(
                                                PageParsingActor.class, 
                                                pageRetriever));
       this.indexer = getContext().actorOf(Props.create(
                                                IndexingActor.class, 
                                                new IndexerImpl(indexWriter)));
     }
    
    // more to come ... 
}
                    

PageParsingActor.java


public void onReceive(Object o) throws Exception {
    if (o instanceof String) {
        PageContent content = pageRetriever.fetchPageContent((String) o);
        getSender().tell(content, getSelf());
    }
}
                    

IndexingActor.java


public void onReceive(Object o) throws Exception {
    if (o instanceof PageContent) {
        PageContent content = (PageContent) o;
        indexer.index(content);
        getSender().tell(new IndexedMessage(content.getPath()), getSelf());
    } else if (COMMIT_MESSAGE == o) {
        indexer.commit();
        getSender().tell(COMMITTED_MESSAGE, getSelf());
    }
}
                    

SimpleActorMaster.java


public void onReceive(Object message) throws Exception {

    if (message instanceof String) {
        visitedPageStore.add((String) message);
        getParser().tell(visitedPageStore.getNext(), getSelf());
    } else if (message instanceof PageContent) {
        PageContent content = (PageContent) message;
        getIndexer().tell(content, getSelf());
        visitedPageStore.addAll(content.getLinksToFollow());

        if (visitedPageStore.isFinished()) {
            getIndexer().tell(IndexingActor.COMMIT_MESSAGE, getSelf());
        } else {
            for (String page : visitedPageStore.getNextBatch()) {
                getParser().tell(page, getSelf());
            }
        }
    } else if (message instanceof IndexedMessage) {
        IndexedMessage indexedMessage = (IndexedMessage) message;
        visitedPageStore.finished(indexedMessage.path);

        if (visitedPageStore.isFinished()) {
            getIndexer().tell(IndexingActor.COMMIT_MESSAGE, getSelf());
        }
    } else if (message == IndexingActor.COMMITTED_MESSAGE) {
        getContext().system().shutdown();
    }
}
                    

Zeitmessung


5847 [main] INFO de.fhopf.akka.Executor - Found 77 results                    
                    

Go Parallel

ParallelMaster.java


public ParallelMaster(final Indexer indexer, final PageRetriever pageRetriever) {
    parser = getContext().actorOf(Props.create(PageParsingActor.class, pageRetriever)
            .withRouter(new RoundRobinRouter(10)).withDispatcher("worker-dispatcher"));
    indexingActor = getContext().actorOf(Props.create(IndexingActor.class, indexer));
}
                    

Zeitmessung


2315 [main] INFO de.fhopf.akka.Executor - Found 77 results                    
                    

Fault Tolerance

Aktoren werden von ihren Eltern überwacht (Supervision)

Aktionen

  • Resume
  • Restart
  • Stop
  • Escalate

Skalierbarkeit

Fehlertoleranz

Wartbarkeit

Ressourcen

Ressourcen