Runners¶
Runners are rather small classes which only purpose is to process jobs, one by one.
Shepherd provides a shepherd.runner.BaseRunner to inherit your runners from.
All the runners may be invoked with shepherd-runner command.
shepherd-runner¶
usage: shepherd-runner
       [-h]
       [-p PORT]
       [-s STREAM]
       [-r RUNNER]
       config_path
Positional Arguments¶
- config_path
- emloop configuration file path 
Named Arguments¶
- -p, --port
- Socket port to bind to - Default: 9999 
- -s, --stream
- Dataset stream name - Default: “predict” 
- -r, --runner
- Fully qualified runner class - Default: “shepherd.runner.JSONRunner” 
Custom Runners¶
In most cases, you can just inherit from shepherd.runner.BaseRunner and override
shepherd.runner.BaseRunner._process_job().
This is exactly what is done by the shepherd.runner.JSONRunner class.
def _process_job(self, input_path: str, output_path: str) -> None:   # simplified
    payload = json.load(open(path.join(input_path, 'input')))
    result = defaultdict(list)
    for input_batch in self._get_stream(payload):
        logging.info('Another batch (%s)', list(input_batch.keys()))
        output_batch = self._model.run(input_batch, train=False, stream=None)
        if hasattr(self._dataset, 'postprocess_batch'):
            logging.info('\tPostprocessing')
            result_batch = self._dataset.postprocess_batch(input_batch=input_batch, output_batch=output_batch)
            logging.info('\tdone')
        else:
            logging.info('Skipping postprocessing')
            result_batch = output_batch
        for source, value in result_batch.items():
            result[source] += list(value)
    logging.info('JSONify')
    result_json = to_json_serializable(result)
    json.dump(result_json, open(path.join(output_path, 'output'), 'w'))
JSONRunner simply loads JSON from inputs/input file, creates a stream from it and writes the output
batches to outputs/output.