Quick Start
OpenFL has a variety of APIs to choose from when setting up and running a federation. In this quick start guide, we will demonstrate how to run a simple federated learning example using the Task Runner API.
Creating a federation in 5 steps
To begin we recommend installing the latest OpenFL inside a python virtual environment. This can be done with the following:
pip install virtualenv
virtualenv ~/openfl-quickstart
source ~/openfl-quickstart/bin/activate
git clone https://github.com/securefederatedai/openfl.git
cd openfl
pip install .
Now you’re ready to run your first federation! Copying these commands to your terminal will run a simple federation with an aggregator and two collaborators all on your local machine. These commands can be broken down into five steps, which you can read more about here
Setup Federation Workspace & Certificate Authority (CA) for Secure Communication
Setup Aggregator & Initialize Federation Plan + Model
Setup Collaborator 1
Setup Collaborator 2
Run the Federation
############################################################################################
# Step 1: Setup Federation Workspace & Certificate Authority (CA) for Secure Communication #
############################################################################################
# Generate an OpenFL Workspace. This example will train a pytorch
# CNN model on the MNIST dataset
fx workspace create --template torch_cnn_mnist --prefix my_workspace
cd my_workspace
# This will create a certificate authority (CA), so the participants communicate over a secure TLS Channel
fx workspace certify
#################################################################
# Step 2: Setup Aggregator & Initialize Federation Plan + Model #
#################################################################
# Generate a Certificate Signing Request (CSR) for the Aggregator
fx aggregator generate-cert-request
# The CA signs the aggregator's request, which is now available in the workspace
fx aggregator certify --silent
# Initialize FL Plan and Model Weights for the Federation
fx plan initialize
################################
# Step 3: Setup Collaborator 1 #
################################
# Create a collaborator named "collaborator1" that will use data path "1"
fx collaborator create -n collaborator1 -d 1
# Generate a CSR for collaborator1
fx collaborator generate-cert-request -n collaborator1
# The CA signs collaborator1's certificate
fx collaborator certify -n collaborator1 --silent
################################
# Step 4: Setup Collaborator 2 #
################################
# Create a collaborator named "collaborator2" that will use data path "2"
fx collaborator create -n collaborator2 -d 2
# Generate a CSR for collaborator2
fx collaborator generate-cert-request -n collaborator2
# The CA signs collaborator2's certificate
fx collaborator certify -n collaborator2 --silent
##############################
# Step 5. Run the Federation #
##############################
# Run the Aggregator
fx aggregator start &
# Run Collaborator 1
fx collaborator start -n collaborator1 &
# Run Collaborator 2
fx collaborator start -n collaborator2
echo "Congratulations! You've run your first federation with OpenFL"
You should see this output at the end of the experiment:
INFO Starting round 9... aggregator.py:897
[15:36:28] INFO Waiting for tasks... collaborator.py:178
INFO Sending tasks to collaborator collaborator2 for round 9 aggregator.py:329
INFO Received the following tasks: [name: "aggregated_model_validation" collaborator.py:143
, name: "train"
, name: "locally_tuned_model_validation"
]
[15:36:30] METRIC Round 9, collaborator collaborator2 is sending metric for task aggregated_model_validation: accuracy 0.983597 collaborator.py:415
[15:36:31] INFO Collaborator collaborator2 is sending task results for aggregated_model_validation, round 9 aggregator.py:520
METRIC Round 9, collaborator validate_agg aggregated_model_validation result accuracy: 0.983597 aggregator.py:559
[15:36:31] INFO Run 0 epoch of 9 round runner_pt.py:148
[15:36:31] INFO Waiting for tasks... collaborator.py:178
INFO Sending tasks to collaborator collaborator1 for round 9 aggregator.py:329
INFO Received the following tasks: [name: "aggregated_model_validation" collaborator.py:143
, name: "train"
, name: "locally_tuned_model_validation"
]
[15:36:33] METRIC Round 9, collaborator collaborator1 is sending metric for task aggregated_model_validation: accuracy 0.981000 collaborator.py:415
[15:36:34] INFO Collaborator collaborator1 is sending task results for aggregated_model_validation, round 9 aggregator.py:520
METRIC Round 9, collaborator validate_agg aggregated_model_validation result accuracy: 0.981000 aggregator.py:559
[15:36:34] INFO Run 0 epoch of 9 round runner_pt.py:148
[15:36:34] METRIC Round 9, collaborator collaborator2 is sending metric for task train: cross_entropy 0.059750 collaborator.py:415
[15:36:35] INFO Collaborator collaborator2 is sending task results for train, round 9 aggregator.py:520
METRIC Round 9, collaborator metric train result cross_entropy: 0.059750 aggregator.py:559
[15:36:35] METRIC Round 9, collaborator collaborator2 is sending metric for task locally_tuned_model_validation: accuracy 0.979596 collaborator.py:415
INFO Collaborator collaborator2 is sending task results for locally_tuned_model_validation, round 9 aggregator.py:520
METRIC Round 9, collaborator validate_local locally_tuned_model_validation result accuracy: 0.979596 aggregator.py:559
INFO Waiting for tasks... collaborator.py:178
[15:36:37] METRIC Round 9, collaborator collaborator1 is sending metric for task train: cross_entropy 0.019203 collaborator.py:415
[15:36:38] INFO Collaborator collaborator1 is sending task results for train, round 9 aggregator.py:520
METRIC Round 9, collaborator metric train result cross_entropy: 0.019203 aggregator.py:559
[15:36:38] METRIC Round 9, collaborator collaborator1 is sending metric for task locally_tuned_model_validation: accuracy 0.977600 collaborator.py:415
INFO Collaborator collaborator1 is sending task results for locally_tuned_model_validation, round 9 aggregator.py:520
METRIC Round 9, collaborator validate_local locally_tuned_model_validation result accuracy: 0.977600 aggregator.py:559
METRIC Round 9, aggregator: train <openfl.interface.aggregation_functions.weighted_average.WeightedAverage object at aggregator.py:838
0x7f329a98bee0> cross_entropy: 0.039476
[15:36:39] METRIC Round 9, aggregator: aggregated_model_validation <openfl.interface.aggregation_functions.weighted_average.WeightedAverage aggregator.py:838
object at 0x7f329a98bee0> accuracy: 0.982298
METRIC Round 9: saved the best model with score 0.982298 aggregator.py:854
METRIC Round 9, aggregator: locally_tuned_model_validation aggregator.py:838
<openfl.interface.aggregation_functions.weighted_average.WeightedAverage object at 0x7f329a98bee0> accuracy:
0.978598
INFO Saving round 10 model... aggregator.py:890
INFO Experiment Completed. Cleaning up... aggregator.py:895
[15:36:39] INFO Waiting for tasks... collaborator.py:178
INFO Sending signal to collaborator collaborator1 to shutdown... aggregator.py:283
INFO End of Federation reached. Exiting... collaborator.py:150
✔ OK
[15:36:46] INFO Waiting for tasks... collaborator.py:178
[15:36:46] INFO Sending signal to collaborator collaborator2 to shutdown... aggregator.py:283
INFO End of Federation reached. Exiting... collaborator.py:150
✔ OK
Congratulations! You've run your first federation with OpenFL
Working with your own model
Now that you’ve run your first federation, let’s see how to replace the model used in the federation. After copying in the text above, you should be in the my_workspace
directory. Every workspace has a src
directory that contains the Task Runner, an OpenFL interface that defines the deep learning model, as well as the training and validation functions that will run on that model. In this case, the Task Runner is defined in src/taskrunner.py
. After opening it you’ll see the following:
class PyTorchCNN(PyTorchTaskRunner):
"""
Simple CNN for classification.
PyTorchTaskRunner inherits from nn.module, so you can define your model
in the same way that you would for PyTorch
"""
def __init__(self, device='cpu', **kwargs):
"""Initialize.
Args:
device: The hardware device to use for training (Default = "cpu")
**kwargs: Additional arguments to pass to the function
"""
super().__init__(device=device, **kwargs)
####################################
# Your model goes here #
####################################
self.conv1 = nn.Conv2d(1, 20, 2, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(800, 500)
self.fc2 = nn.Linear(500, 10)
self.to(device)
####################################
######################################################################
# Your optimizer goes here #
# #
# `self.optimizer` must be set for optimizer weights to be federated #
######################################################################
self.optimizer = optim.Adam(self.parameters(), lr=1e-4)
# Set the loss function
self.loss_fn = F.cross_entropy
def forward(self, x):
"""
Forward pass of the model.
Args:
x: Data input to the model for the forward pass
"""
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 800)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
PyTorchTaskRunner
inherits from nn.module
, so changing your deep learning model is as easy as modifying the network layers (i.e. self.conv1
, etc.) into the __init__
function, and then defining your forward
function. You’ll notice that unlike PyTorch, the optimizer is also defined in this __init__
function. This is so the model AND optimizer weights can be distributed as part of the federation.
Defining your own train and validate tasks
If you continue scrolling down in src/taskrunner.py
, you’ll see two functions: train_
and validate_
. These are the primary tasks performed by the collaborators that have access to local data.
def train_(self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]]) -> Metric:
"""
Train single epoch.
Override this function in order to use custom training.
Args:
train_dataloader: Train dataset batch generator. Yields (samples, targets) tuples of
size = `self.data_loader.batch_size`.
Returns:
Metric: An object containing name and np.ndarray value.
"""
losses = []
for data, target in train_dataloader:
data, target = data.to(self.device), target.to(self.device)
self.optimizer.zero_grad()
output = self(data)
loss = self.loss_fn(output, target)
loss.backward()
self.optimizer.step()
losses.append(loss.detach().cpu().numpy())
loss = np.mean(losses)
return Metric(name=self.loss_fn.__name__, value=np.array(loss))
def validate_(self, validation_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]]) -> Metric:
"""
Perform validation on PyTorch Model
Override this function for your own custom validation function
Args:
validation_dataloader: Validation dataset batch generator. Yields (samples, targets) tuples
Returns:
Metric: An object containing name and np.ndarray value
"""
total_samples = 0
val_score = 0
with torch.no_grad():
for data, target in validation_dataloader:
samples = target.shape[0]
total_samples += samples
data, target = data.to(self.device), target.to(self.device, dtype=torch.int64)
output = self(data)
# get the index of the max log-probability
pred = output.argmax(dim=1)
val_score += pred.eq(target).sum().cpu().numpy()
accuracy = val_score / total_samples
return Metric(name='accuracy', value=np.array(accuracy))
Each function is passed a dataloader, and returns a Metric
associated with that task. In this example the train_
function returns the Cross Entropy Loss for an epoch, and the validate_
function returns the accuracy. You’ll see these metrics reported when running the collaborator locally, and the aggregator will report the average metrics coming from all collaborators.
Defining your own data loader
Now let’s look at the OpenFL PyTorchDataLoader
and see how by subclassing it we are able to split the MNIST dataset across collaborators for training. You’ll find the following defined in src/dataloader.py
.
from openfl.federated import PyTorchDataLoader
class PyTorchMNISTInMemory(PyTorchDataLoader):
"""PyTorch data loader for MNIST dataset."""
def __init__(self, data_path, batch_size, **kwargs):
"""Instantiate the data object.
Args:
data_path: The file path to the data
batch_size: The batch size of the data loader
**kwargs: Additional arguments, passed to super
init and load_mnist_shard
"""
super().__init__(batch_size, **kwargs)
num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard(
shard_num=int(data_path), **kwargs)
self.X_train = X_train
self.y_train = y_train
self.train_loader = self.get_train_loader()
self.X_valid = X_valid
self.y_valid = y_valid
self.val_loader = self.get_valid_loader()
self.num_classes = num_classes
This example uses the classic MNIST dataset for digit recognition. For in-memory datasets, the data_path
is passed a number to determine which slice of the dataset the collaborator should receive. By initializing the train_loader
(self.train_loader = self.get_train_loader()
) and the val_loader
(self.val_loader = self.get_valid_loader()
), these dataloader will then be able to be passed into the train_
and validate_
functions defined above.
Changing the number of federated rounds
Now that we’ve seen how to change the code, let’s explore the Federated Learning Plan (FL Plan). The plan, which is defined in plan/plan.yaml
, is used to configure everything about the federation that can’t purely be expressed in python. This includes information like network connectivity details, how different components are configured, and how many rounds the federation should train. Different experiments may take more rounds to train depending on how similar data is between collaborators, the model, and the number of collaborators that participate. To tweak this parameter for your experiment, open plan/plan.yaml
and modify the following section:
aggregator:
settings:
best_state_path: save/torch_cnn_mnist_best.pbuf
db_store_rounds: 2
init_state_path: save/torch_cnn_mnist_init.pbuf
last_state_path: save/torch_cnn_mnist_last.pbuf
log_metric_callback:
template: src.utils.write_metric
rounds_to_train: 10 # Change this value to train for a different number of rounds
write_logs: true
Starting a new federation after making custom changes
Now that you’ve changed a few things, you can rerun the federation. Copying the below text will reinitialize your plan with new model weights, and relaunch the aggregator and two collaborators:
fx plan initialize
fx aggregator start &
fx collaborator start -n collaborator1 &
fx collaborator start -n collaborator2
Well done! Now that you know the basics of using the Task Runner API to run OpenFL on a single node, check out some of the other Examples for Running a Federation for research purposes and in production.