Advanced: Streaming & Interactive jobs

In several cases it is desireable to stream data from/to interactive jobs as well as data to a remote filesystem. The GRPC API has build-in support for asynchronous streaming through many simultaneous requests. In Python this API is exposed in terms of generators.

Example: an online job

In this example we’ll show how to obtain bi-directional communication with an online job. An online job is started with Scheduler.submit_online_job().

Streaming input, a.k.a. The Halting Problem

We need to stream input to the online job. In the Quick Start, we saw that we could send data to a stream by simply giving a list of bytes objects. Here we aim a bit more advanced to play a kind of real-time ping-pong with a remote process. We need to provide PyXenon with an generator that pulls its messages from a queue. The GRPC module ensures that this generator is being run asynchonously from the main thread.

The tricky part is that we need to be able to tell the generator when the work is done and no more input is to be expected. We could have it recieve strings and make it check for end-of-file messages in some way, but in essence we’ll always have to define a little protocol to deal with the finiteness of the generator’s life. To make this explicit we define a little 2-tuple micro-language:

message action
('msg', <value: string>) yield value.encode()
('end', None) return

Implementing this:

from queue import Queue

def make_input_stream():
    input_queue = Queue()

    def input_stream():
        while True:
            cmd, value = input_queue.get()
            if cmd == 'end':
                input_queue.task_done()
                return
            elif cmd == 'msg':
                yield value.encode()
                input_queue.task_done()

    return input_queue, input_stream

Reading output

The return-value of submit_online_job() is an iterator yielding objects of type SubmitOnlineJobResponse. These objects have a stdout field containing (binary) data that the job wrote to standard output, as well as a stderr field containing data written to standard error. For any message either field may be empty or not. In this example we’re only interested in data from stdout:

def get_stdout(stream):
    return stream.next().stdout.decode()

The “remote” script

For the purpose of this example, we have defined a small Python rot13 program:

rot13.py
import codecs

try:
    while True:
        line = input()
        print(codecs.encode(line, 'rot_13'))

except EOFError:
    pass

Defining the job

Online job descriptions are the same as normal job descriptions.

# our input lines
input_lines = [
    "Zlfgvp aboyr tnf,",
    "Urnil lrg syrrgvat sebz tenfc,",
    "Oyhr yvxr oheavat vpr."
]

# the job description, make sure you run the script from the examples
# directory!
job_description = xenon.JobDescription(
    executable='python',
    arguments=['rot13.py'],
    queue_name='multi')

Putting it together

The rest is history.

import xenon

# start the xenon-grpc server
xenon.init()

# on the local adaptor
with xenon.Scheduler.create(adaptor='local') as scheduler:
    input_queue, input_stream = make_input_stream()

    # submit an interactive job, this gets us the job-id and a stream
    # yielding job output from stdout and stderr.
    job, output_stream = scheduler.submit_interactive_job(
        description=job_description, stdin_stream=input_stream())

    # next we feed the input_queue with messages
    try:
        for line in input_lines:
            print(" [sending]   " + line)
            input_queue.put(('msg', line + '\n'))
            msg = get_stdout(output_stream)
            print("[received]   " + msg)

    # make sure to close our end whatever may happen
    finally:
        input_queue.put(('end', None))
        input_queue.join()

    scheduler.wait_until_done(job)

Protocol definitions

It can be instructive to see what the GRPC protocol with respect to interactive jobs looks like.

message SubmitInteractiveJobRequest {
    Scheduler scheduler = 1;
    JobDescription description = 2;
    bytes stdin = 3;
}

message SubmitInteractiveJobResponse {
    Job job = 1;
    bytes stdout = 2;
    bytes stderr = 3;
}

service SchedulerService {
    rpc submitInteractiveJob(
            stream SubmitInteractiveJobRequest)
        returns (stream SubmitInteractiveJobResponse) {}
}

In PyXenon the remote procedure call submitInteractiveJob is wrapped to the method submit_interactive_job() of the Scheduler class. Note that the SubmitInteractiveJobRequest specifies (next to the scheduler, which is obtained from self in the method call) the job description and bytes for standard input. Requests of this type are streamed. This means that GRPC expects to get an iterator of SubmitInteractiveJobRequest objets.

The PyXenon submit_interactive_job() method separates the job-description and input-stream arguments. Sending the scheduler and description fields in the first request, followed up by a sequence of requests where only the stdin field is specified. This latter sequence is yielded from the stdin_stream argument.

Similarly, the first item in the output stream is guaranteed to only contain the job-id, this first item is available immediately. Subsequent calls to next(output_stream) will block until output is available. The submit_interactive_job() method takes the first item of the iterator, and extracts the job-id. The user recieves a tuple with the extracted job-id and the iterator.