tf.data.experimental.service.DisServer

An in-process tf.data service dis server.

A tf.data.experimental.service.DisServer coordinates a cluster of tf.data.experimental.service.WorkerServers. When the workers start, they register themselves with the diser.

diser = tf.data.experimental.service.DisServer()
diser_address = diser.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
    tf.data.experimental.service.WorkerConfig(
    diser_address=diser_address))
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=diser.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

When starting a dedicated tf.data dis process, use join() to block after starting up the server, until the server terminates.

diser = tf.data.experimental.service.DisServer(
    tf.data.experimental.service.DiserConfig(port=5050))
diser.join()

Call stop() to gracefully terminate the diser. The server automatically stops when all reference to it have been deleted.

To start a DisServer in fault-tolerant mode, set work_dir and fault_tolerant_mode like below:

diser = tf.data.experimental.service.DisServer(
    tf.data.experimental.service.DiserConfig(
        port=5050,
        work_dir="gs://my-bucket/diser/work_dir",
        fault_tolerant_mode=True))

config(Optional.) A tf.data.experimental.service.DiserConfig configration. If None, the diser will use default configuration values.
start(Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True.

targetReturns a target that can be used to connect to the server.

diser = tf.data.experimental.service.DisServer()
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
    processing_mode="parallel_epochs", service=diser.target))

The returned string will be in the form protocol://address, e.g. "grpc://localhost:5050".

Methods

join

View source

Blocks until the server has shut down.

This is useful when starting a dedicated dis process.

diser = tf.data.experimental.service.DisServer(
    tf.data.experimental.service.DiserConfig(port=5050))
diser.join()

Raises
tf.errors.OpErrorOr one of its subclasses if an error occurs while joining the server.

start

View source

Starts this server.

diser = tf.data.experimental.service.DisServer(start=False)
diser.start()

Raises
tf.errors.OpErrorOr one of its subclasses if an error occurs while starting the server.

stop

View source

Stops the server.

Raises
tf.errors.OpErrorOr one of its subclasses if an error occurs while stopping the server.