Index ソフト・ハード PyTorch | rankingSample |
RankNet ネットワーク 損失関数 データセット 機能・要件 構成・方式 タスク ライブラリ 導入 sample datasets myData |
RankNet
class Net(nn.Module):
def __init__(self, D):
super(Net, self).__init__()
self.l1 = nn.Linear(D, 10)
self.l2 = nn.Linear(10, 1)
def forward(self, x):
x = torch.sigmoid(self.l1(x))
x = self.l2(x)
return x
def pairwise_loss(s_i, s_j, S_ij, sigma=1):
C = torch.log1p(torch.exp(-sigma * (s_i - s_j)))
if S_ij == -1:
C += sigma * (s_i - s_j)
elif S_ij == 0:
C += 0.5 * sigma * (s_i - s_j)
elif S_ij == 1:
pass
else:
raise ValueError("S_ij: -1/0/1")
return C
# データセット
def make_dataset(N_train, N_valid, D):
ws = torch.randn(D, 1)
X_train = torch.randn(N_train, D, requires_grad=True)
X_valid = torch.randn(N_valid, D, requires_grad=True)
ys_train_score = torch.mm(X_train, ws)
ys_valid_score = torch.mm(X_valid, ws)
bins = [-2, -1, 0, 1] # 5 relevances
ys_train_rel = torch.Tensor(
np.digitize(ys_train_score.clone().detach().numpy(), bins=bins)
)
ys_valid_rel = torch.Tensor(
np.digitize(ys_valid_score.clone().detach().numpy(), bins=bins)
)
return X_train, X_valid, ys_train_rel, ys_valid_rel
def main(): # main
import argparse # コマンドライン引数を解析(ハイフォンはアンダーバーに置き換わる。)
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--verbose', action='store_true', default=False, help='enables verbose logging') # ログ表示(デバック時)
parser.add_argument('--batch_size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') # 訓練時バッチサイズ
parser.add_argument('--test_batch_size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)') # テスト時バッチサイズ
parser.add_argument('--no_shuffle', action='store_true', default=False, help='disables dataset shuffling') # 訓練データをシャッフルしない。
parser.add_argument('--epochs', type=int, default=20, metavar='N', help='number of epochs to train (default: 10)') # 訓練時のエポック数
parser.add_argument('--lr', type=float, default=0.001, metavar='LR', help='learning rate (default: 0.01)') # 学習率
parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)') # 乱数のシードを指定
parser.add_argument('--no_cuda', action='store_true', default=False, help='disables CUDA training') # GPUがあってもCUDAを使用しない。(デバック時)
parser.add_argument('--dry_run', action='store_true', default=False, help='quickly check a single pass') # デック用に1バッチ実行
parser.add_argument('--log_interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') # 進捗表示間隔
parser.add_argument('--save_model', type=str, metavar='path', default=None, help='saves model to file') # モデル(ネット)を保存、読み込むパス
parser.add_argument('--datadir', type=str, default="") #
args = parser.parse_args()
torch.manual_seed(args.seed) # 乱数シードを設定
level = (logging.DEBUG if args.verbose else logging.INFO) # ログ出力を設定
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=level)
use_cuda = not args.no_cuda and torch.cuda.is_available() # CUDA の使用・不使用
device = torch.device("cuda" if use_cuda else "cpu")
print(device)
train_kwargs = {'batch_size': args.batch_size, 'shuffle': not args.no_shuffle} # バッチサイズその他のパラメータを設定
test_kwargs = {'batch_size': args.test_batch_size}
if use_cuda:
cuda_kwargs = {'pin_memory': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
print(args)
train_dataloader = DataLoader(training_data, **train_kwargs, num_workers=2) # 訓練データ読み込み
test_dataloader = DataLoader(test_data, **train_kwargs, num_workers=2) # テストデータ読み込み
model = MyNet() # モデル作成(ネットワーク構造の構築)
if args.save_model is not None:
logging.info(f'Loading: {args.save_model}...') # モデルをファイルから読み込む
try:
params = torch.load(args.save_model, map_location=device)
model.load_state_dict(params)
except FileNotFoundError as e:
logging.error(f'Error: {e}')
model = model.to(device)
print(model)
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) # 最適化器と学習率 lr=0.001
# エポック回だけ訓練・テストを繰り返す。
for epoch in range(args.epochs):
logging.info(f'*** epoch={epoch+1}/{args.epochs} ***')
train(model, device, train_dataloader, optimizer, log_interval=args.log_interval, dry_run=args.dry_run)
test(model, device, test_dataloader)
# モデルをファイルに保存する。
# if args.save_model is not None:
# logging.info(f'Saving: {args.save_model}...')
# params = model.state_dict()
# torch.save(params, args.save_model)
# model = models.vgg16(pretrained=True)
PATH = "./model_weight.pth"
torch.save(model.state_dict(), PATH) # 学習済パラメータを辞書に保存
torch.save(model, 'model.pth') # 形状を含み保存
return
class MyNet(nn.Module): # ネットワーク構造の定義
def __init__(self): # 各レイヤーの初期化①
super(MyNet,self).__init__()
self.conv1 = nn.Conv2d(1,32,3,1) # 畳み込み
self.conv2 = nn.Conv2d(32,64,3,1) # 畳み込み
self.pool = nn.MaxPool2d(2,2) # Max Pooling(1/2)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(12*12*64,128) # 全接続 (fully connected)
self.fc2 = nn.Linear(128,10)
return
def forward(self,x): # ミニバッチ x を処理
x = self.conv1(x)
x = f.relu(x)
x = self.conv2(x)
x = f.relu(x)
x = self.pool(x)
x = self.dropout1(x)
x = x.view(-1,12*12*64)
x = self.fc1(x)
x = f.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
return f.log_softmax(x, dim=1)
# 各レイヤーの初期化②
# def __init__(self):
# super(MyNet,self).__init__()
# self.conv1 = nn.Conv2d(1, 10, 3) # 畳み込み
# self.pool1 = nn.MaxPool2d(2) # Max Pooling(1/2)
# self.conv2 = nn.Conv2d(10, 20, 3) # 畳み込み
# self.pool2 = nn.MaxPool2d(2) # Max Pooling(1/2)
# self.fc1 = nn.Linear(20*5*5, 10) # 全接続 (fully connected)
# return
# def forward(self, x): # ミニバッチ x を処理
# x = self.conv1(x) # x: (N × 1 × 28 × 28)
# x = f.relu(x)
# x = self.pool1(x)
# x = self.conv2(x)
# x = f.relu(x)
# x = self.pool2(x)
# x = x.reshape(len(x), 20*5*5)
# x = self.fc1(x)
# return x
def train(model, device, loader, optimizer, log_interval=1, dry_run=False): # 1エポック分の訓練
model.train()
for (idx, (images, labels)) in enumerate(loader): # 各ミニバッチを処理
inputs = images.float().to(device) # 入力をfloat型のテンソルに変換
targets = labels.long().to(device) # 正解をlong型のテンソルに変換
optimizer.zero_grad() # すべての勾配(.grad)をクリア
outputs = model(inputs) # 与えたミニバッチをニューラルネットワークで処理
loss = f.cross_entropy(outputs, targets) # 損失を計算
loss.backward() # 勾配を計算
optimizer.step() # 重み、バイアスを更新
if idx % 100 == 99:
# if dry_run or ((idx+1) % log_interval) == 0: # 現在の状況を表示
avg_loss = loss.item() / len(outputs)
# logging.info(f'train: batch={idx+1}/{len(loader)}, loss={avg_loss:.4f}')
# if dry_run: # dry_run モードの場合、1回で終了
# break
return
def test(model, device, loader): # テスト
model.eval() # 検証/テストモード
correct = 0
with torch.no_grad(): # 以下の処理ではautograd機能を使わない。
for (idx, (images, labels)) in enumerate(loader): # 各ミニバッチを処理
inputs = images.float().to(device) # 入力をfloat型のテンソルに変換
targets = labels.long().to(device) # 正解をlong型のテンソルに変換
outputs = model(inputs) # 与えたミニバッチをニューラルネットワークで処理
n = 0 # 正解かどうかを判定
for (y,label) in zip(outputs, targets):
i = torch.argmax(y)
if i == label:
n += 1
logging.debug(f'test: batch={idx+1}/{len(loader)}, correct={n}/{len(outputs)}')
correct += n
total = len(loader.dataset) # 結果を表示
logging.info(f'test: total={correct}/{total} ({100*correct/total:.2f}%)')
return
if __name__ == "__main__": main()
|
All Rights Reserved. Copyright (C) ITCL |