This example uses the same basic ROI detection workflow as shown in the single-process sample, but uses Myriad’s Akka-based concurrency to perform the work across multiple cores.

When the application is started an ROI processing pipeline is constructed. If the included model finds what appears to be an indication of structural damage in sensor data, it will log an entry of the form

[MyriadDemo-...] INFO c.e.myriad.network.ReporterActor - Received ROI report from Actor[akka://MyriadDemo...]: ** FLAW FOUND ** &source=...

A complete project bundle, including a pre-trained model and sample input files, is available. The source code for the example is reprinted below. Note that this demo code is set up as an always-running server application – if files are provided on the command line it will ingest and search for flaws but otherwise continues to run. You can send new files for inspection to this application by retrieving the path to the ingestor pool and sending it a new FileMessage from another Myriad-based application.

/*
 * com.emphysic.myriad.core.demo.MyriadDemo
 *
 * Copyright (c) 2016 Emphysic LLC.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.emphysic.myriad.demo;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
import com.emphysic.myriad.core.data.ops.GaussianBlur;
import com.emphysic.myriad.core.data.ops.GaussianPyramidOperation;
import com.emphysic.myriad.core.data.ops.SobelOperation;
import com.emphysic.myriad.core.data.roi.PassiveAggressiveROIFinder;
import com.emphysic.myriad.core.data.roi.ROIFinder;
import com.emphysic.myriad.network.*;
import com.emphysic.myriad.network.messages.FileMessage;
import com.emphysic.myriad.network.messages.ShutdownMessage;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.TimeUnit;

/**
 * Demonstrates a distributed ROI processing pipeline.  Analogous to the single-process PipelineDemo example.
 *
 * 1. Ingest data
 * 2. Run a Gaussian Pyramid operation on the input for scale invariance
 * 2a. (Optional) perform any preprocessing desired on each step in the pyramid in 2
 * 3. For each step in the pyramid in 2(a), run a sliding window operation
 * 4. For each sliding window in 3, ask an ROI detector whether the window contains ROI or not
 * 5. Compile the results
 *
 * Created by ccoughlin on 10/25/16.
 */
@Slf4j
public class MyriadDemo {
    /**
     * Main Akka system
     */
    private ActorSystem system;
    /**
     * Akka configuration
     */
    private final Config config;
    /**
     * Reference to the start of the pipeline, a data ingestion pool
     */
    private ActorRef ingestor;
    /**
     * Number of workers in the data ingestion stage (defaults to 1).
     */
    private int numIngestors = 1;
    /**
     * Number of Gaussian pyramid workers in the scaling stage (defaults to 2).
     */
    private int numScalers = 2;
    /**
     * Number of preprocessor workers in the preprocessing stage (defaults to 2).
     */
    private int numPreprocessors = 2;
    /**
     * Number of sliding window workers in the sliding window stage (defaults to 4).
     */
    private int numSliders = 4;
    /**
     * Number of ROI detector workers in the ROI detection stage (defaults to 8).
     */
    private int numDetectors = 8;
    /**
     * Number of ROI reporting workers in the reporting stage (defaults to 8).
     */
    private int numReporters = 8;
    /**
     * Radius of blur operation in scaling stage (defaults to 5).
     */
    private int blurRadius = 5;
    /**
     * Ratio of scales between subsequent pyramid steps in the scaling stage (defaults to 2 i.e. each step is 1/2
     * the size of the previous step).
     */
    private int scaleFactor = 2;
    /**
     * Cutoff size in scaling stage (defaults to 1 i.e. when any dimension of the current step is only 1 element pyramid
     * is complete).
     */
    private int scaleLimit = 1;
    /**
     * Width in points of the sliding window (defaults to 15).
     */
    private int windowWidth = 15;
    /**
     * Height in points of the sliding window (defaults to 15).
     */
    private int windowHeight = 15;
    /**
     * Step size in points of the sliding window (defaults to 5 i.e. a window is taken every 5 points).
     */
    private int windowStep = 5;
    /**
     * The ROI finder to use
     */
    private ROIFinder roiFinder;

    public MyriadDemo(Config config) {
        this.config = config;
        system = ActorSystem.create("MyriadDemo", config);
    }

    /**
     * Constructs the processing pipeline.
     * @return true if configuration of each stage was successful, false otherwise.
     */
    public boolean startup() {
        try {
            log.info("Creating processing pipeline");
            ingestor = system.actorOf(Props.create(DataIngestorPool.class, numIngestors), "IngestorPool");
            log.info("Ingestor pool of " + numIngestors + " workers created");
            ActorRef scaler = system.actorOf(
                    Props.create(PyramidActorPool.class,
                            numScalers,
                            new GaussianPyramidOperation(
                                    new GaussianBlur(blurRadius),
                                    scaleFactor,
                                    scaleLimit)
                    ),
                    "ScalerPool"
            );
            log.info("Scale space pool of " + numScalers + " workers created");
            ActorRef preprocessor = system.actorOf(
                    Props.create(DatasetOperationPool.class, numPreprocessors, new SobelOperation()),
                    "PreprocessorPool"
            );
            log.info("Preprocessor pool of " + numPreprocessors + " workers created");
            ActorRef slider = system.actorOf(
                    Props.create(SlidingWindowPool.class,
                            numSliders,
                            windowStep,
                            windowWidth,
                            windowHeight
                    ),
                    "SliderPool"
            );
            log.info("Slider pool of " + numSliders + " workers created");
            ActorRef roidetection = system.actorOf(
                    Props.create(ROIFinderPool.class,
                            numDetectors,
                            loadROIFinder()
                    ),
                    "ROIFinderPool"
            );
            log.info("ROI detector pool of " + numDetectors + " workers created");
            ActorRef reporting = system.actorOf(
                    Props.create(ReporterActorPool.class,
                            numReporters
                    ),
                    "ReportingPool"
            );
            log.info("Reporting pool of " + numReporters + " workers created");

            // Connect the processing stages
            log.info("Connecting pipeline stages");
            ActorRef guardian = system.guardian();
            ingestor.tell(scaler, guardian);
            scaler.tell(preprocessor, guardian);
            preprocessor.tell(slider, guardian);
            slider.tell(roidetection, guardian);
            roidetection.tell(reporting, guardian);
            log.info("Pipeline constructed, ready to receive inputs");
            return true;
        } catch (IllegalAccessException | IOException | InstantiationException ioe) {
            log.error("Unable to read model file, error was: " + ioe.getMessage());
        } catch (Exception e) {
            log.error("An error occurred constructing the pipeline: " + e.getMessage());
        }
        return false;
    }

    /**
     * Adds a file for processing
     * @param f name of file to ingest
     */
    public void ingest(File f) {
        if (ingestor != null) {
            log.info("Sending " + f + " through pipeline");
            ingestor.tell(new FileMessage(f), system.guardian());
        } else {
            log.error("No ingestor configured - are you sure you called startup() ?");
        }
    }

    /**
     * Adds a file for processing
     * @param f pathname of file to ingest
     */
    public void ingest(String f) {
        ingest(new File(f));
    }

    /**
     * Convenience method for loading the C-scan damage detection model.
     * @return model instance
     * @throws InstantiationException error instantiating the ROIFinder (abstract, interface, etc.)
     * @throws IllegalAccessException constructor isn't accessible
     * @throws IOException if an I/O error occurs reading the input file
     */
    private static ROIFinder loadROIFinder() throws IllegalAccessException, IOException, InstantiationException {
        URL earl = Thread.currentThread().getContextClassLoader().getResource("models/sobel_cscan_model.myr");
        File modelFile = new File(earl.getPath());
        assert (modelFile.exists());
        return ROIFinder.fromFile(modelFile, PassiveAggressiveROIFinder.class);
    }

    /**
     * Demonstration of running - ingests input files from the command line to search for ROI
     * @param args list of files to search
     * @throws Exception if an error occurs
     */
    public static void main(String[] args) throws Exception {
        Config config = ConfigFactory.load();
        MyriadDemo demo = new MyriadDemo(config);
        boolean ready = demo.startup();
        if (ready) {
            for (String input : args) {
                demo.ingest(input);
            }
        } else {
            System.out.println("Unable to construct pipeline, please check log files for further details.");
        }
    }
}