FATE —— 二.2.5 Homo-NN定制Trainer以控制训练过程

前言
在本教程中 , 您将学习如何创建和定制您自己的 , 以控制培训过程、进行预测并汇总结果以满足您的特定需求 。我们将首先向您介绍需要实现的类的接口 。然后 , 我们将提供算法的工具示例(请注意 , 这只是一个工具示例 , 不应在生产中使用) , 以帮助您更好地理解教练定制的概念 。
基本类 基础
Class是FATE中所有Homo NN培训师的基地 。要创建自定义训练器 , 您需要将位于.hhomo.中的类进行子类化 。您必须实现两个必需的函数:
还有一个可选的“()”函数:它接受一个参数 , 一个数据集 , 并允许您定义培训师如何进行预测 。如果您想使用FATE框架 , 您需要确保返回数据的格式正确 , 以便FATE能够正确显示(我们将在后面的教程中介绍) 。"
在Homo NN客户端组件中 , “()”函数用于将初始化的模型设置为训练器 。开发培训师时 , 可以使用“()”设置模型 , 然后在培训师中使用“self.model”访问模型 。
这里显示这些接口的源代码:
class TrainerBase(object):def __init__(self, **kwargs):...self._model = None......@propertydef model(self):if not hasattr(self, '_model'):raise AttributeError('model variable is not initialized, remember to call'' super(your_class, self).__init__()')if self._model is None:raise AttributeError('model is not set, use set_model() function to set training model')return self._model@model.setterdef model(self, val):self._model = val@abc.abstractmethoddef train(self, train_set, validate_set=None, optimizer=None, loss=None, extra_data=http://www.kingceram.com/post/{}):"""train_set:数据集实例 , 必须是数据集子类(federatedml.nn.Dataset.base)的实例 , 例如 , TableData()(来自federatedml.nn.dataset.table)validate_set:数据集实例 , 但可选的必须是数据集子类的实例(federatedml.nn.dataset.base) , 例如TableData()(来自federateddl.nn.datadataset.table)优化器:pytorch优化器类实例 , 例如 , t.optim.Adam()、t.optim.SGD()loss:pytorch loss类 , 例如 , nn.BECLoss() , nn.CrossEntropyLoss()"""pass@abc.abstractmethoddef predict(self, dataset):pass@abc.abstractmethoddef server_aggregate_procedure(self, extra_data=http://www.kingceram.com/post/{}):pass
Fed模式/本地模式
培训师有一个属性“self.” , 在运行联合任务时设置为True 。您可以使用此变量来确定培训师是在联合模式下运行还是在本地调试模式下运行 。如果要在本地测试培训器 , 可以使用“()”函数将“self.”设置为False 。
示例:开发工具
为了帮助您理解如何实现这些函数 , 我们将通过演示算法的玩具实现来提供一个具体的示例如该网址 。在中 , 训练过程与标准算法略有不同 , 因为在计算损失时 , 需要从当前模型和全局模型计算近端项 。我们将带您一步一步地阅读带有注释的代码 。
【FATE —— 二.2.5 Homo-NN定制Trainer以控制训练过程】工具
这是训练器的代码 , 保存在.nn.homo.模块中 。此培训器实现两个功能:train和ccure 。这些功能可以完成简单的培训任务 。该代码包含注释以提供更多详细信息 。
from pipeline.component.nn import save_to_fate
%%save_to_fate trainer fedprox.pyimport copyfrom federatedml.nn.homo.trainer.trainer_base import TrainerBasefrom torch.utils.data import DataLoader# We need to use aggregator client&server class for federationfrom federatedml.framework.homo.aggregator.secure_aggregator import SecureAggregatorClient, SecureAggregatorServer# We use LOGGER to output logsfrom federatedml.util import LOGGERclass ToyFedProxTrainer(TrainerBase):def __init__(self, epochs, batch_size, u):super(ToyFedProxTrainer, self).__init__()# trainer parametersself.epochs = epochsself.batch_size = batch_sizeself.u = u# Given two model, we compute the proximal termdef _proximal_term(self, model_a, model_b):diff_ = 0for p1, p2 in zip(model_a.parameters(), model_b.parameters()):diff_ += t.sqrt((p1-p2.detach())**2).sum()return diff_# implement the train function, this function will be called by client side# contains the local training process and the federation partdef train(self, train_set, validate_set=None, optimizer=None, loss=None, extra_data=http://www.kingceram.com/post/{}):sample_num = len(train_set)aggregator = Noneif self.fed_mode:aggregator = SecureAggregatorClient(True, aggregate_weight=sample_num, communicate_match_suffix='fedprox')# initialize aggregator# set dataloaderdl = DataLoader(train_set, batch_size=self.batch_size, num_workers=4)for epoch in range(self.epochs):# the local training processLOGGER.debug('running epoch {}'.format(epoch))global_model = copy.deepcopy(self.model)loss_sum = 0# batch training processfor batch_data, label in dl:optimizer.zero_grad()pred = self.model(batch_data)loss_term_a = loss(pred, label)loss_term_b = self._proximal_term(self.model, global_model)loss_ = loss_term_a + (self.u/2) * loss_term_bloss_.backward()loss_sum += float(loss_.detach().numpy())optimizer.step()# print lossLOGGER.debug('epoch loss is {}'.format(loss_sum))# the aggregation processif aggregator is not None:self.model = aggregator.model_aggregation(self.model)converge_status = aggregator.loss_aggregation(loss_sum)# implement the aggregation function, this function will be called by the sever sidedef server_aggregate_procedure(self, extra_data={}):# initialize aggregatorif self.fed_mode:aggregator = SecureAggregatorServer(communicate_match_suffix='fedprox')# the aggregation process is simple: every epoch the server aggregate model and loss oncefor i in range(self.epochs):aggregator.model_aggregation()merge_loss, _ = aggregator.loss_aggregation()
本地测试
我们可以使用()在本地测试新的训练器 。
import torch as tfrom federatedml.nn.dataset.table import TableDatasetmodel = t.nn.Sequential(t.nn.Linear(30, 1),t.nn.Sigmoid())ds = TableDataset()ds.load('../examples/data/breast_homo_guest.csv')# 根据自己得文件地址进行调整trainer = ToyFedProxTrainer(10, 128, u=0.1)trainer.set_model(model)opt = t.optim.Adam(model.parameters(), lr=0.01)loss = t.nn.BCELoss()# 由于这里要求输入值(不是分类)的范围要在(0,1)之间 , 否则会报错 。但是模型中的Sigmoid函数已经对其进行了处理 。所以 , 笔者在这里并没有看清楚其损失函数得出错位置 , 于是将其BCELoss损失函数替换为了MSELosstrainer.local_mode()trainer.train(ds, None, opt, loss)
这里在进行训练时 , 产生了报错 。经过笔者debug后发现在经过现行层(30, 1)后 , 输出为nan 。如果有小伙伴知道如何解决 , 望告知 。
笔者在官网提出该问题后 , 官方团队给出答复:
def _proximal_term(self, model_a, model_b):diff_ = 0for p1, p2 in zip(model_a.parameters(), model_b.parameters()):diff_ += ((p1-p2.detach())**2).sum()return diff_
可以工作!然后 , 我们将提交一个联合任务 , 看看我们的培训师是否工作正常 。
提交新任务以测试
# torchimport torch as tfrom torch import nnfrom pipeline import fate_torch_hookfate_torch_hook(t)# pipelinefrom pipeline.component.homo_nn import HomoNN, TrainerParam# HomoNN Component, TrainerParam for setting trainer parameterfrom pipeline.backend.pipeline import PipeLine# pipeline classfrom pipeline.component import Reader, DataTransform, Evaluation # Data I/O and Evaluationfrom pipeline.interface import Data# Data Interaces for defining data flow# create a pipeline to submitting the jobguest = 9999host = 10000arbiter = 10000pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host, arbiter=arbiter)# read uploaded datasettrain_data_0 = {"name": "breast_homo_guest", "namespace": "experiment"}train_data_1 = {"name": "breast_homo_host", "namespace": "experiment"}reader_0 = Reader(name="reader_0")reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=train_data_0)reader_0.get_party_instance(role='host', party_id=host).component_param(table=train_data_1)# The transform component converts the uploaded data to the DATE standard formatdata_transform_0 = DataTransform(name='data_transform_0')data_transform_0.get_party_instance(role='guest', party_id=guest).component_param(with_label=True, output_format="dense")data_transform_0.get_party_instance(role='host', party_id=host).component_param(with_label=True, output_format="dense")"""Define Pytorch model/ optimizer and loss"""model = nn.Sequential(nn.Linear(30, 1),nn.Sigmoid())loss = nn.BCELoss()optimizer = t.optim.Adam(model.parameters(), lr=0.01)"""Create Homo-NN Component"""nn_component = HomoNN(name='nn_0',model=model, # set modelloss=loss, # set lossoptimizer=optimizer, # set optimizer# Here we use fedavg trainer# TrainerParam passes parameters to fedavg_trainer, see below for details about Trainertrainer=TrainerParam(trainer_name='fedprox', epochs=3, batch_size=128, u=0.5),torch_seed=100 # random seed)# define work flowpipeline.add_component(reader_0)pipeline.add_component(data_transform_0, data=http://www.kingceram.com/post/Data(data=reader_0.output.data))pipeline.add_component(nn_component, data=Data(train_data=data_transform_0.output.data))pipeline.compile()pipeline.fit()