Runners

Runners are rather small classes which only purpose is to process jobs, one by one. Shepherd provides a shepherd.runner.BaseRunner to inherit your runners from.

All the runners may be invoked with shepherd-runner command.

shepherd-runner

usage: shepherd-runner
       [-h]
       [-p PORT]
       [-s STREAM]
       [-r RUNNER]
       config_path

Positional Arguments

config_path

emloop configuration file path

Named Arguments

-p, --port

Socket port to bind to

Default: 9999

-s, --stream

Dataset stream name

Default: “predict”

-r, --runner

Fully qualified runner class

Default: “shepherd.runner.JSONRunner”

Custom Runners

In most cases, you can just inherit from shepherd.runner.BaseRunner and override shepherd.runner.BaseRunner._process_job().

This is exactly what is done by the shepherd.runner.JSONRunner class.

def _process_job(self, input_path: str, output_path: str) -> None:   # simplified
    payload = json.load(open(path.join(input_path, 'input')))
    result = defaultdict(list)
    for input_batch in self._get_stream(payload):
        logging.info('Another batch (%s)', list(input_batch.keys()))
        output_batch = self._model.run(input_batch, train=False, stream=None)
        if hasattr(self._dataset, 'postprocess_batch'):
            logging.info('\tPostprocessing')
            result_batch = self._dataset.postprocess_batch(input_batch=input_batch, output_batch=output_batch)
            logging.info('\tdone')
        else:
            logging.info('Skipping postprocessing')
            result_batch = output_batch

        for source, value in result_batch.items():
            result[source] += list(value)

    logging.info('JSONify')
    result_json = to_json_serializable(result)
    json.dump(result_json, open(path.join(output_path, 'output'), 'w'))

JSONRunner simply loads JSON from inputs/input file, creates a stream from it and writes the output batches to outputs/output.