最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

tensorflow2.0 - How can I make the worker execute the step_fn function instead of the chief in the parameterServerStrategy in Te

programmeradmin4浏览0评论

I am trying to implement the paramterServerStrategy in TensorFlow, but for some reason when I run the worker, it does not execute as the worker but as the chief.

This is the script of my worker

import os
import json
import tensorflow as tf

with open("cluster_spec.json", "r") as f:
    loaded_cluster_spec = json.load(f)

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": loaded_cluster_spec,
    "task": {"type": "worker", "index": 0}
})

print(f'Worker pid is {os.getpid()}')

print(loaded_cluster_spec)
server = tf.distribute.Server(
    loaded_cluster_spec,
    job_name='worker',
    task_index=0,
    protocol= 'grpc',
    start=True)
server.join()

This is the script of the parameterServer

import os
import json
import tensorflow as tf

with open("cluster_spec.json", "r") as f:
    loaded_cluster_spec = json.load(f)

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": loaded_cluster_spec,
    "task": {"type": "ps", "index": 0}
})

server = tf.distribute.Server(
    loaded_cluster_spec,
    job_name = 'ps',
    task_index = 0,
    protocol = 'grpc',
    start = True)
server.join()

And this is the code for the chief and the model

import time
import tensorflow as tf
import portpicker
import json
import os

#----------------------------------------Creating the workers and the paramterServer-------------------------------
def create_in_process_cluster_dict(num_workers, num_ps):
    """Creates and starts local servers and returns the cluster_resolver."""
    worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
    ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

    cluster_dict = {
        "worker": ["localhost:%s" % port for port in worker_ports],
        "ps": ["localhost:%s" % port for port in ps_ports],
        "chief": [f'localhost:{portpicker.pick_unused_port()}']
    }
    return cluster_dict
#------------------------------------------------------------------------------------------------------------------



#------------------------------------------Can you check these-----------------------------------------

tf_config_cluster_dict = create_in_process_cluster_dict(1, 1)

with open("cluster_spec.json", "w") as f:
    json.dump(tf_config_cluster_dict, f, indent=4)

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": tf_config_cluster_dict,
    "task": {"type": "chief", "index": 0}
})
print(f'Chiefs pid is {os.getpid()}')
strategy = tf.distribute.ParameterServerStrategy(
    cluster_resolver=tf.distribute.cluster_resolver.TFConfigClusterResolver()
)
coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)


#-----------------------------------------------------------------------------------------------------







with strategy.scope():
    def create_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D((2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])
        return model

    model = create_model()
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# Normalize the dataset
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add channel dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

BATCH_SIZE = 64

def dataset_fn(input_context):
    """Distributes dataset across workers."""
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_dataset = train_dataset.shuffle(10000).batch(BATCH_SIZE).repeat().prefetch(tf.data.experimental.AUTOTUNE)
    return train_dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)


with strategy.scope():
    global_step = tf.Variable(0, dtype=tf.int32, trainable=False)
    number_of_active_workers = tf.Variable(0, dtype=tf.int32, trainable=False)




#------------------------------ Look here -----------------------------------------


@tf.function
def step_fn(iterator):
    def worker_step(batch_data, labels):
        tf.print(f'my pid is {os.getpid()}')
        with tf.GradientTape() as tape:
            predictions = model(batch_data, training=True)
            per_example_loss = loss_object(labels, predictions)
            loss = tf.reduce_mean(per_example_loss)

        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        accuracy(labels, predictions)
        # Atomically update the global step count
        global_step.assign_add(1)
        return loss
    

    batch_data, labels = next(iterator)
    losses = strategy.run(worker_step, args=(batch_data, labels))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)


@tf.function
def per_worker_dataset_fn():
    #distribute_datasets_from_function to specify how the dataset should be distributed across replicas.
    return strategy.distribute_datasets_from_function(dataset_fn)


#----------------------------------------------------------------------------------------------------------




per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)


# # Train the model
steps = 4000
for _ in range(steps):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))

#We can use join to wait for all workers
# coordinator.join()

#And we can also use a global variable to count the number of steps.
while global_step < steps:
    time.sleep(1)
    print(accuracy.result())
    print(global_step)

I have made some dashes so that don't need to read the whole code. The code for the workers and the ps is very simple. As you can see in the chief code in worker_step there is code to print the id of the process that is running the step. I thought that it should print the pid of the worker, but it prints the pid of the chief instead. Also when I printed the TF_CONFIG, it said that the process that is running the worker_step is the chief.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论