Runners¶
Runners are rather small classes which only purpose is to process jobs, one by one.
Shepherd provides a shepherd.runner.BaseRunner
to inherit your runners from.
All the runners may be invoked with shepherd-runner
command.
shepherd-runner¶
usage: shepherd-runner
[-h]
[-p PORT]
[-s STREAM]
[-r RUNNER]
config_path
Positional Arguments¶
- config_path
emloop configuration file path
Named Arguments¶
- -p, --port
Socket port to bind to
Default: 9999
- -s, --stream
Dataset stream name
Default: “predict”
- -r, --runner
Fully qualified runner class
Default: “shepherd.runner.JSONRunner”
Custom Runners¶
In most cases, you can just inherit from shepherd.runner.BaseRunner
and override
shepherd.runner.BaseRunner._process_job()
.
This is exactly what is done by the shepherd.runner.JSONRunner
class.
def _process_job(self, input_path: str, output_path: str) -> None: # simplified
payload = json.load(open(path.join(input_path, 'input')))
result = defaultdict(list)
for input_batch in self._get_stream(payload):
logging.info('Another batch (%s)', list(input_batch.keys()))
output_batch = self._model.run(input_batch, train=False, stream=None)
if hasattr(self._dataset, 'postprocess_batch'):
logging.info('\tPostprocessing')
result_batch = self._dataset.postprocess_batch(input_batch=input_batch, output_batch=output_batch)
logging.info('\tdone')
else:
logging.info('Skipping postprocessing')
result_batch = output_batch
for source, value in result_batch.items():
result[source] += list(value)
logging.info('JSONify')
result_json = to_json_serializable(result)
json.dump(result_json, open(path.join(output_path, 'output'), 'w'))
JSONRunner
simply loads JSON from inputs/input
file, creates a stream from it and writes the output
batches to outputs/output
.