
このブログでは、StockNet というディープラーニングモデルをレビューします。このモデルは、マルチモーダルデータを使用して株価の動きを予測するモデルであり、2018年にACL(Association for Computational Linguistics、計算言語学協会)のAI分野におけるトップクラスの国際会議で発表されました。
StockNet では、株価という時系列データと、X(旧Twitter)で収集されたテキストデータを処理するための明確なパイプラインが提示されています。また、単語埋め込み(word embedding)でアテンションメカニズムを採用しているだけでなく、潜在状態を考慮した損失関数にもアテンションメカニズムを統合しています。
しかし、StockNet は2018年に発表されたにもかかわらず、非常に古い Python 2 と TensorFlow 1.4.0 でコーディングされていました(ソースコード)。そのため、StockNet のアーキテクチャを学びながら、Python 3.10 と PyTorch 2.3.1 を使用して再構築を行いました。
コードアーキテクチャは以下のように再構築されました。旧バージョンのコードと比較すると、三層モデルは1つのモジュールに統合するのではなく、分離して構築されています。これにより、メンテナンス性が向上しています。
旧バージョンコード:
再構築したコード:
このブログには、まだ最適化が不十分なコードが含まれています。しかし、本ブログの主な目的は、LSTM/GRU、アテンションメカニズムとVAE(変分オートエンコーダ)を理解している方が、このマルチモーダル処理の時系列予測モデルのアーキテクチャを参考にできるようにすることです。それにより、データ融合やマルチモーダル予測の分野に対する理解と洞察を深める助けになればと思います。より深い理解を得るために、元の論文を読んだ上で本ブログをお読みいただくことを強くお勧めします。
また、元論文では主に生成モデルの変種が使用されており、損失関数の導出は変分推論に基づいています。これは確率モデル$\log p (y \mid X)$から導出が始まります。多くの変分推論に基づく論文と同様に、その導出過程がVAE(変分オートエンコーダ)の初期論文に似ているため、詳細な導出プロセスは省略されることが一般的です。しかし、本ブログではこの論文の損失関数の導出詳細をさらに掘り下げて説明します。これにより、確率モデルが深層学習でどのように応用されるかの理解を深めるだけでなく、ベイズ学派のモデリングにおける考え方についても理解を促進することを目指しています。
背景
従来の株式市場予測モデルは、ニュースやソーシャルメディアの内容を活用することが一般的でした。特に、ディープラーニングの普及に伴い、構造化イベント表現を利用するアプローチが注目されています。しかし、これらの手法では、以下の3つの課題に直面していました。
- 市場の確率性: 株価は新しい情報に大きく依存しており、ランダムウォークのパターンを示す。
- 混沌とした情報: ソーシャルメディア上のテキストデータはカオス的であり、効果的な処理が困難。
- 時間的依存関係: 株価の変動は、過去の動向から影響を受けるが、既存の研究ではこれを十分に考慮していない。
StockNetの概要
StockNetは、ツイートと株価履歴から株価の動向を予測するための深層生成モデルです。本モデルの主な特長は以下の通りです。
- 連続潜在変数を利用: 市場の確率的特性を捉えるために、連続的な潜在変数を採用。
- 変分推論の適用: ニューラル変分推論を利用し、計算負荷の高い事後分布推定を効果的に解決。
- 時系列的補助目標の設定: 過去のデータとの関連性を学習することで、予測精度を向上。
データ収集と前処理
本研究のデータセットはここに公開されています。以下がその概要です。
- 期間: 2014年1月1日から2016年1月1日までの2年間。
- 対象株式: 各業界の代表的な88銘柄。
- ツイートデータ: 株式シンボルを含むツイートを収集し、質の高いテキストを選別。
- 株価履歴データ: 過去の株価データをYahoo Financeから取得。
データの前処理とパラメーターコントローラー(設定ファイルおよびそのローダーを含む ConfigLoader.py)は、旧バージョンのソースコード(src/DataPipe.py、config.yml、ConfigLoader.py)とほぼ同じです。DataPipe.py では、主に以下の3つの機能が実現されています:
- バッチの生成(バッチサイズの定義、単語埋め込みの取得、適切な行列形状の生成を含む)
- 取引日を揃える(すべての日のデータが取引日のデータであるわけではなく、履歴価格データを取引日のテキストデータに合わせる必要がある)
- 事前学習済みモデル(GloVe)を使用した初期重みの生成
ロス関数の導出
潜在状態 𝑧を使用する場合、条件付き確率𝑦の𝑥における確率は、𝑧を積分することで周辺確率として表現できます。
$$\log p_\theta (y \mid X) = \log \int_z p_\theta (y, Z \mid X) \, dZ \tag{1}$$
変分推論を利用すると、次のように表されます。
$$\log p_\theta (y \mid X) = \log \int_z q_\phi (Z \mid X, y) \frac{p_\theta (y, Z \mid X)}{q_\phi (Z \mid X, y)} \, dZ \tag{2}$$
ジェンセンの不等式の導入:
凸関数𝑓(𝑥)に対しては、
$$f(\mathbb{E}[x]) \leq \mathbb{E}[f(x)], \quad \text{if } f(x) \text{ is convex} \tag{3}$$
凹関数𝑓(𝑥)に対しては、
$$f(\mathbb{E}[x]) \geq \mathbb{E}[f(x)], \quad \text{if } f(x) \text{ is concave} \tag{4}$$
ログ関数に適用すると、
$$\log(\mathbb{E}[x]) \geq \mathbb{E}[\log(x)] \tag{5}$$
この場合、$x = F(Z) = \frac{p_\theta(y, Z \mid X)}{q_\phi(Z \mid X, y)}$とおくと、
$$\log \int_z q_\phi (Z \mid X, y) \frac{p_\theta (y, Z \mid X)}{q_\phi (Z \mid X, y)} \, dz = \log \int_z p(Z) F(Z) \, dz = \log \mathbb{E}[F(Z)] \tag{6}$$
そして、
$$\log \mathbb{E}[F(Z)] \geq \mathbb{E}[\log(F(Z))] = \int_z p(Z) \log(F(Z)) \, dz = \int_z q_\phi (Z \mid X, y) \log \frac{p_\theta (y, Z \mid X)}{q_\phi (Z \mid X, y)} \, dz \tag{7}$$
が成り立ちます。この結果を使用すると、
$$\log p_\theta (y \mid X) \geq \mathbb{E}_q [\log p_\theta (y, Z \mid X) - \log q_\phi (Z \mid X, y)] \tag{8}$$
が得られます。
共通確率分布を利用すると、以下の変形になります。
$$\log p_\theta (y \mid X) \geq \mathbb{E}_q [\log p_\theta (y \mid Z, X) + \log p_\theta (Z \mid X)] - \log q_\phi (Z \mid X, y) \tag{9}$$
更に変形すると、
$$\log p_\theta (y \mid X) \geq \mathbb{E}_q [\log p_\theta (y, Z \mid X) - \log q_\phi (Z \mid X, y)] = \mathbb{E}_q {\log [p_\theta (y \mid Z, X) \cdot p_\theta (Z \mid X)] - \log q_\phi (Z \mid X, y)}$$
$$= \mathbb{E}_q [\log p_\theta (y \mid Z, X) + \log p_\theta (Z \mid X)] - \log q_\phi (Z \mid X, y) \tag{10}$$
に変形されます。
KLダイバージェンスの定義:
$$D_{KL}[q_\phi (Z \mid X, y) | p_\phi (Z \mid X, y)] = \mathbb{E}_q [\log q_\phi (Z \mid X, y) - \log p_\phi (Z \mid X, y)] \tag{11}$$
これを使用すると、以下の関係が得られます。
$$\log p_\theta (y \mid X) \geq \mathbb{E}_q [\log p_\theta (y \mid Z, X)] - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] \tag{12}$$
これは ELBO (Evidence Lower Bound)、つまり仮定した理想的な分布を近似するために最適化可能な方程式です。
以下の他の導出方法があります。これをさらに直接的に確認できます。まず方程式2から始めます。
共通確率分布を使用した分解:
$𝑝_\theta (𝑦 , 𝑍 \mid 𝑋)$を共通確率分布を用いて分解すると:
$$\log p_\theta (y \mid X) = \log \int_z q_\phi (Z \mid X, y) \frac{p_\theta (y \mid X, Z) \cdot p_\theta (Z \mid X)}{q_\phi (Z \mid X, y)} \, dZ \tag{13}$$
KLダイバージェンスの使用:
KLダイバージェンスの性質から次が成り立ちます:
$$\frac{p_\theta (Z \mid X)}{q_\phi (Z \mid X, y)} = \exp(-D_{KL}[q_\phi (Z \mid X, y) | p_\theta (Z \mid X)]) \tag{14}$$
この式を方程式13に代入すると:
$$\log p_\theta (y \mid X) = \log \int_z q_\phi (Z \mid X, y) \exp(-D_{KL}[q_\phi (Z \mid X, y) | p_\theta (Z \mid X)]) p_\theta (y \mid X, Z) \, dZ \tag{15}$$
KLダイバージェンスは定数なので、次のように整理できます:
$$\log p_\theta (y \mid X) = \log \exp(-D_{KL}[q_\phi (Z \mid X, y) | p_\theta (Z \mid X)]) \int_z q_\phi (Z \mid X, y) p_\theta (y \mid X, Z) \, dZ$$
$$= -D_{KL}[q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \log \int_z q_\phi (Z \mid X, y) p_\theta (y \mid X, Z) \, dZ \tag{16}$$
追加の分解:
さらに$𝑝_\theta (𝑍 \mid 𝑋 , 𝑦)$で分解すると:
$$\log p_\theta (y \mid X) = -D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \log \int_z \frac{q_\phi (Z \mid X, y)}{p_\theta (Z \mid X, y)} p_\theta (Z \mid X, y) p_\theta (y \mid X, Z) \, dZ$$
$$= -D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \log \int_z \exp(D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X, y)]) p_\theta (Z \mid X, y) p_\theta (y \mid X, Z) \, dZ$$
$$= D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X, y)] - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \log \int_z p_\theta (Z \mid X, y) p_\theta (y \mid X, Z) \, dZ$$
$$= D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X, y)] - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \log \mathbb{E}_q [p_\theta (y \mid X, Z)] \tag{17}$$
ここで、以下の元論文の式(10)には誤りがあることがわかります。理由もなく期待値の項の中に対数を置いています。しかし、この誤りは結果には影響を与えていません。
引き続きジェンセンを導入すると:
$$\log p_\theta (y \mid X) = D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X, y)] - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \log \mathbb{E}_q [p_\theta (y \mid X, Z)]$$
$$\geq D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X, y)] - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] + \mathbb{E}_q [\log p_\theta (y \mid X, Z)] \tag{18}$$
最初の項は、未知の事前分布(ground truth prior)に基づいており、その範囲は [0,1]です。したがって、残りの項は最適化の対象となる目的関数であり、これは最初の導出結果と一致します。
この論文における確率の連鎖律:
$$\log p_\theta (y, Z \mid X) = p_\theta (y_T \mid X, Z) p_\theta (z_t \mid z_{\lt T}, X) \prod_{t=1}^{T-1} p_\theta (y_t \mid x_{\leq t}, z_t) p_\theta (z_t \mid z_{\leq T}, y_t, x_{\leq t}) \tag{19}$$
この式は、次のような意味を持ちます:
- 最終日の予測$y_T$は、全ての潜在変数$Z$と$X$に基づいて生成されます。
- 潜在変数$z_t$は、それ以前の$Z$と$X$に基づいて生成されます。
- その他の日の予測$y_t$は、単一の潜在変数$z_t$と、それ以前の$X$に基づいて生成されます。
- 潜在変数$z_t$は、それ以前の$Z$、単一のラベル$y_t$、およびそれ以前の$X$に基づいて生成されます。
これらの実現には注意メカニズムが依存しています。
この式に基づく損失関数の導出:
損失関数は以下の形で定義されます:
$$L(\theta, \phi; X, y) = \mathbb{E}_{q_\phi (Z \mid X, y)} [\log p_\theta (y \mid X, Z)] - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] \tag{20}$$
モンテカルロ法の適用:
次のような積分を近似するためにモンテカルロ法が使用されます:
$$\mathcal{I} = \int_a^b f(x) \, dx \tag{21}$$
範囲[𝑎,𝑏]のランダムサンプル𝑥を用いて、期待値で積分結果を近似します:
$$\mathcal{I} \approx \frac{1}{N} \sum_{i=1}^N f(x_i) \tag{22}$$
この方法を用いると、以下の期待値を近似できます:
$$\mathbb{E}_{q_\phi (Z \mid X, y)} [\log p_\theta (y \mid X, Z)] \approx \frac{1}{K} \sum_{i=1}^K \log p_\theta (y \mid X, Z) \tag{23}$$
特に𝐾=1の場合:
$$L_{<T} (\theta, \phi; X, y) = \log p_\theta (y \mid X, Z) - D_{KL} [q_\phi (Z \mid X, y) | p_\theta (Z \mid X)] \tag{24}$$
損失関数の時間的分離:
損失関数は、最終日$𝑡=𝑇$とそれ以外の日$𝑡<𝑇$で分けられます:
- $𝑡<𝑇$の場合:
$$L(\theta, \phi; X, y) = \log p_\theta (y_t \mid x_{\leq t}, z_{\leq t}) - D_{KL} [q_\phi (z_t \mid z_{<t}, x_{\leq t}, y_t) | p_\theta (z_t \mid x_{\leq t}, x_{<t})] \tag{25}$$
- $𝑡=𝑇$の場合:
$$L(\theta, \phi; X, y) = \log p_\theta (y_t \mid X, Z) - D_{KL} [q_\phi (z_t \mid z_{<t}, x_{\leq t}, y_t) | p_\theta (z_t \mid x_{\leq t}, x_{<t})] \tag{26}$$
KL正則化効果を制御するためのアニーリングトリック:
𝜆を用いて KL 正則化の影響を制御します:
$$f_t = \log p_\theta (y_t \mid x_{\leq t}, z_{\leq t}) - \lambda D_{KL} [q_\phi (z_t \mid z_{<t}, x_{\leq t}, y_t) | p_\theta (z_t \mid x_{\leq t}, x_{<t})] \tag{27}$$
アテンションメカニズムを用いた最終損失関数:
条件$𝑡<𝑇$の下でアテンションメカニズムによる時間的重みを掛け合わせた場合、最終的な損失関数は以下のように表されます:
$$F(\theta, \phi; X, y) = \frac{1}{N} \sum_{n=1}^N v^{(n)} f^{(n)} \tag{28}$$
モデル構成
StockNetは以下の3つの主要コンポーネントで構成されています。
市場情報エンコーダ (MIE): ツイートと株価データをエンコードし、市場情報の質を向上。
MIE層では、埋め込まれたテキストデータと初期重みがこの層に入力されます。双方向GRUを使用して入力データをエンコードします。テキストデータのグローバル依存性を導入するために、アテンションメカニズム(Attention Mechanism)が使用され、最終的なMIE出力としてコーパス埋め込みを生成します。MIEの出力をVMDに入力する前に、コーパス埋め込みは履歴価格データと特徴レベルで結合する必要があります。
import torch
import torch.nn as nn
import torch.nn.functional as F
from ConfigLoader import logger, ss_size, vocab_size, config_model
class MIE(nn.Module):
def __init__(self, init_word_table):
super(MIE, self).__init__()
logger.info('INIT: #stock: {0}, #vocab+1: {1}'.format(ss_size, vocab_size))
self.max_n_days = config_model['max_n_days'] # define time window
self.max_n_msgs = config_model['max_n_msgs'] # set max msg per day
self.max_n_words = config_model['max_n_words'] # set max words per msg
self.weight_init = config_model['weight_init'] #initilization setting
self.initializer = nn.init.xavier_uniform_ if self.weight_init == 'xavier-uniform' else nn.init.xavier_normal_ #initilization setting
self.word_embed_type = config_model['word_embed_type'] # word embeding model name
self.y_size = config_model['y_size'] #label dimentions
self.word_embed_size = config_model['word_embed_size'] # word embedding dimension
self.mel_cell_type = config_model['mel_cell_type'] # encoder type: lstm, gru
self.mel_h_size = config_model['mel_h_size'] #encoder state dimension
self.msg_embed_size = config_model['mel_h_size'] # attention output dimension
#dropout setting
self.dropout_train_mel_in = config_model['dropout_mel_in']
self.dropout_train_mel = config_model['dropout_mel']
self.dropout_train_ce = config_model['dropout_ce']
# Word Embedding
self.word_embedding = nn.Embedding(vocab_size, self.word_embed_size)
self.word_embedding.weight.data.copy_(init_word_table)
# RNN Layers
if self.mel_cell_type == 'ln-lstm':
self.rnn = nn.LSTM(self.word_embed_size, self.mel_h_size, batch_first=True, bidirectional=True)
elif self.mel_cell_type == 'gru':
self.rnn = nn.GRU(self.word_embed_size, self.mel_h_size, batch_first=True, bidirectional=True)
else:
self.rnn = nn.RNN(self.word_embed_size, self.mel_h_size, batch_first=True, bidirectional=True)
# Attention layers
self.attention_proj = nn.Linear(self.msg_embed_size, self.msg_embed_size, bias=False)
self.attention_weight = nn.Linear(self.msg_embed_size, 1, bias=False)
# Dropout layers
self.dropout_mel_in = nn.Dropout(self.dropout_train_mel_in)
self.dropout_mel = nn.Dropout(self.dropout_train_mel)
self.dropout_ce = nn.Dropout(self.dropout_train_ce)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def forward(self, word_input):
"""
Args:
word_input: Tensor of word indices [batch_size, max_n_days, max_n_msgs, max_n_words].
n_msgs_ph: Number of messages [batch_size, max_n_days].
Returns:
corpus_embed: Tensor of corpus embeddings [batch_size, max_n_days, corpus_embed_size].
"""
batch_size, max_n_days, max_n_msgs, max_n_words = word_input.size()
word_embed = self.word_embedding(word_input) # [batch_size, max_n_days, max_n_msgs, max_n_words, embed_size]
# print(word_embed.size())
word_embed = self.dropout_mel_in(word_embed)
word_embed = word_embed.view(-1, max_n_words, self.word_embed_size) # Flatten for RNN input
rnn_out, _ = self.rnn(word_embed) # [batch_size * max_n_msgs, max_n_words, 2 * mel_h_size]
# Separate forward and backward outputs
rnn_out_forward = rnn_out[..., :self.mel_h_size]
rnn_out_backward = rnn_out[..., self.mel_h_size:]
# Average forward and backward outputs
rnn_out = (rnn_out_forward + rnn_out_backward) / 2
# print("rnn_out", rnn_out.size())
rnn_out = rnn_out.contiguous().view(batch_size, max_n_days, max_n_msgs, -1) # Reshape back
# print("rnn_out", rnn_out.size())
self.attention_proj = nn.Linear(rnn_out.shape[-1], self.msg_embed_size, bias=False).to(self.device)
rnn_out = rnn_out.to(self.device)
attention_score = self.attention_proj(rnn_out).tanh()
attention_weight = self.attention_weight(attention_score).squeeze(-1)
attention_weight = F.softmax(attention_weight, dim=-1) # Apply softmax over messages
corpus_embed = torch.einsum('bijk,bij->bik', rnn_out, attention_weight) # Weighted sum
# corpus_embed = self.dropout_ce(corpus_embed)
return corpus_embed
変分運動デコーダ (VMD): 潜在変数を推定し、株価変動をデコード。
VMD層では、潜在状態の表現力を強化するために、生成モデルであるVAE(変分オートエンコーダ)が使用されます。潜在状態𝑧と予測値yは、入力𝑥から推論およびデコードされます。まず、GRUを用いて隠れ状態ℎを推論します。次に、潜在変数𝑧をサンプリングするための事前分布と事後分布を近似するプロセスでは、ニューラルネットワークと再パラメータ化を利用して変分推論を簡単に実行できます。近似された事後分布から得られる𝑧を使用し、株式の動きの情報を含む隠れ状態ℎとコーパス埋め込み𝑥を組み合わせて、新たな潜在状態 𝑔を投影します。最後に、すべての時刻における𝑔を用いて、アテンションメカニズムを用いた時系列補助として株式の動き𝑦𝑡を予測します。
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
from torch.distributions import kl_divergence
from ConfigLoader import *
class VMD(nn.Module):
def __init__(self):
super(VMD, self).__init__()
self.is_training_phase = None # training, dev, test
self.h_size = config_model['h_size'] # hidden state h dimension
self.z_size = config_model['h_size'] # latent state z dimension
self.g_size = config_model['g_size'] # latent state g dimension
self.y_size = config_model['y_size'] # label dimension
# dropout setting
self.dropout_vmd_in = config_model['dropout_vmd_in']
self.dropout_vmd = config_model['dropout_vmd']
# hidden state type: lstm,gru
self.vmd_cell_type = config_model['vmd_cell_type']
#whether using hidden state hz
self.vmd_rec = config_model['vmd_rec']
#whether using attention process
self.daily_att = config_model['daily_att']
#Which model use in VMD: generative or discriminative
self.variant_type = config_model['variant_type']
#initilization setting
uniform = True
self.initializer = nn.init.xavier_uniform_ if uniform else nn.init.xavier_normal_
self.bias_initializer = lambda bias: nn.init.constant_(bias, 0.0)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def _linear(self, args, output_size, activation=None, use_bias=True, use_bn=False):
# if not a tuple or list, make it a list
if not isinstance(args, (list, tuple)):
args = [args]
# get dimensions
sizes = [a.size(-1) for a in args]
total_arg_size = sum(sizes)
# concate tensor
x = args[0] if len(args) == 1 else torch.cat(args, dim=-1)
# initialize weight
weight = torch.empty(total_arg_size, output_size, dtype=x.dtype, device=x.device)
self.initializer(weight) # provide initialized method
weight = nn.Parameter(weight) # change to model para
# linear transformation
res = torch.matmul(x, weight)
# if using bias
if use_bias:
bias = torch.empty(output_size, dtype=x.dtype, device=x.device)
self.bias_initializer(bias)
bias = nn.Parameter(bias)
res += bias
# if using Batch Normalization
if use_bn:
bn = nn.BatchNorm1d(res.size(-1))
if self.is_training_phase:
res = bn(res)
# apply activation function
if activation == 'tanh':
res = torch.tanh(res)
elif activation == 'sigmoid':
res = torch.sigmoid(res)
elif activation == 'relu':
res = F.relu(res)
elif activation == 'softmax':
res = F.softmax(res, dim=-1)
return res
# for reparameterization
def _z(self, arg, is_prior):
#calc. mean and std
mean = self._linear(arg, self.z_size)
stddev = self._linear(arg, self.z_size)
stddev = torch.sqrt(torch.exp(stddev))
# add noise
epsilon = torch.randn(self.batch_size, self.z_size, device=mean.device)
# if the prior, only use mean
z = mean if is_prior else mean + stddev * epsilon
# define gassian distribution
pdf_z = Normal(loc=mean, scale=stddev)
return z, pdf_z
#Without hidden state hz
def _create_vmd_with_h_rec(self):
x = F.dropout(self.x, p=self.dropout_vmd_in) # Dropout
x = x.permute(1, 0, 2) # max_n_days * batch_size * x_size
y_ = self.y_ph.permute(1, 0, 2) # max_n_days * batch_size * y_size
# generate mask
self.mask_aux_trading_days = torch.arange(self.max_n_days, device=self.T_ph.device).expand(len(self.T_ph),
self.max_n_days) < (
self.T_ph - 1).unsqueeze(1).bool()
# initialization
h_s = torch.zeros(self.max_n_days, self.batch_size, self.h_size, device=x.device)
z_prior = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
z_post = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
kl = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
# Iterate all timestep
for t in range(self.max_n_days):
if t == 0:
# initialize h_s and z
h_s_t_1 = torch.tanh(torch.randn(self.batch_size, self.h_size, device=x.device))
z_t_1 = torch.tanh(torch.randn(self.batch_size, self.z_size, device=x.device))
else:
h_s_t_1 = h_s[t - 1]
z_t_1 = z_post[t - 1]
# Update hidden state
gate_args = torch.cat([x[t], h_s_t_1, z_t_1], dim=-1)
r = self._linear(gate_args, self.h_size, activation="sigmoid")
u = self._linear(gate_args, self.h_size, activation="sigmoid")
h_args = torch.cat([x[t], r * h_s_t_1, z_t_1], dim=-1)
h_tilde = self._linear(h_args, self.h_size, activation="tanh")
h_s_t = (1 - u) * h_s_t_1 + u * h_tilde
# calc. prior and posterior of z
h_z_prior_t = self._linear(torch.cat([x[t], h_s_t], dim=-1), self.z_size, activation="tanh")
z_prior_t, z_prior_pdf = self._z(h_z_prior_t, is_prior=True)
h_z_post_t = self._linear(torch.cat([x[t], h_s_t, y_[t]], dim=-1), self.z_size, activation="tanh")
z_post_t, z_post_pdf = self._z(h_z_post_t, is_prior=False)
# KL div. calc.
kl_t = kl_divergence(z_post_pdf, z_prior_pdf)
# keep state
h_s[t] = h_s_t
z_prior[t] = z_prior_t
z_post[t] = z_post_t
kl[t] = kl_t
# recover tensor
h_s = h_s.permute(1, 0, 2) # batch_size * max_n_days * h_size
z_prior = z_prior.permute(1, 0, 2) # batch_size * max_n_days * z_size
z_post = z_post.permute(1, 0, 2) # batch_size * max_n_days * z_size
kl = kl.permute(1, 0, 2) # batch_size * max_n_days * z_size
self.kl = kl.sum(dim=-1) # batch_size * max_n_days
# generate g , y
self.g = self._linear(torch.cat([x.permute(1, 0, 2), h_s, z_post], dim=-1), self.g_size, activation="tanh")
self.y = self._linear(self.g, self.y_size, activation="softmax")
# extract last timestep
sample_index = torch.arange(self.batch_size, device=x.device).unsqueeze(1)
self.indexed_T = torch.cat([sample_index, (self.T_ph - 1).unsqueeze(1)], dim=1)
# self.g_T = self.g[torch.arange(self.batch_size), self.T_ph - 1]
# self.y_T = self.y[torch.arange(self.batch_size), self.T_ph - 1] if not self.daily_att else None
def infer_func():
g_T = self.g_T = self.g[torch.arange(self.batch_size), self.T_ph - 1] # batch_size * g_size
if not self.daily_att:
y_T = self.self.y[torch.arange(self.batch_size), self.T_ph - 1] # batch_size * y_size
return g_T, y_T
return g_T
def gen_func():
# Use prior for g
z_prior_T = z_prior[torch.arange(self.batch_size), self.T_ph - 1] # batch_size * z_size
h_s_T = h_s[torch.arange(self.batch_size), self.T_ph - 1]
x_T = x[torch.arange(self.batch_size), self.T_ph - 1]
g_T = self._linear(torch.cat([x_T, h_s_T, z_prior_T], dim=-1), self.g_size)
if not self.daily_att:
y_T = F.softmax(self._linear(g_T, self.y_size), dim=-1)
return g_T, y_T
return g_T
self.g_T = infer_func() if self.is_training_phase else gen_func()
#with hidden state hz
def _create_vmd_with_zh_rec(self):
"""
Create a variational movement decoder.
x: [batch_size, max_n_days, vmd_in_size]
=> vmd_h: [batch_size, max_n_days, vmd_h_size]
=> z: [batch_size, max_n_days, vmd_z_size]
=> y: [batch_size, max_n_days, 2]
"""
# Dropout
x = F.dropout(self.x, p=self.dropout_vmd_in)
# Mask for auxiliary trading days
self.mask_aux_trading_days = torch.arange(self.max_n_days, device=self.T_ph.device).expand(len(self.T_ph),
self.max_n_days) < (self.T_ph - 1).unsqueeze(1).bool()
# Initialize RNN
if self.vmd_cell_type == 'ln-lstm':
rnn = nn.LSTM(self.x_size, self.h_size, batch_first=True, dropout=self.dropout_vmd).to(self.device)
else:
rnn = nn.GRU(self.x_size, self.h_size, batch_first=True, dropout=self.dropout_vmd).to(self.device)
# rnn = nn.Dropout(p=self.dropout_vmd)(rnn)
# Calculate vmd_h
packed_input = nn.utils.rnn.pack_padded_sequence(
x.to("cpu"), self.T_ph.cpu(), batch_first=True, enforce_sorted=False
).to(self.device)
packed_input = packed_input.to(self.device)
h_s_packed, _ = rnn(packed_input)
h_s, _ = nn.utils.rnn.pad_packed_sequence(h_s_packed, batch_first=True, total_length=x.shape[1])
# check shape
# print(f"h_s.shape: {h_s.shape}")
assert h_s.shape[1] == 5, f"Expected seq_len=5, but got {h_s.shape[1]}"
# Transpose for time-step-wise processing
x = x.permute(1, 0, 2) # [max_n_days, batch_size, x_size]
h_s = h_s.permute(1, 0, 2) # [max_n_days, batch_size, h_size]
y_ = self.y_ph.permute(1, 0, 2) # [max_n_days, batch_size, y_size]
# Initialize tensors for z_prior, z_post, and kl
z_prior = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
z_post = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
kl = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
# print(x.shape)
# print(h_s.shape)
# Loop through each time step
for t in range(self.max_n_days):
if t == 0:
z_post_t_1 = self._linear(torch.cat([h_s[t], z_post[t - 1]], dim=-1), self.z_size, activation="tanh") #initialize z_post_t_1
else:
z_post_t_1 = z_post[t - 1]
assert x.shape[0] == 5, f"{x.shape} ,{x}"
assert h_s.shape[0] == 5, f"{h_s.shape} ,{h_s}"
# Prior and posterior computations
h_z_prior_t = self._linear(torch.cat([x[t,:,:], h_s[t,:,:], z_post_t_1], dim=-1), self.z_size, activation="tanh")
z_prior_t, z_prior_pdf = self._z(h_z_prior_t, is_prior=True)
h_z_post_t = self._linear(torch.cat([x[t,:,:], h_s[t,:,:], y_[t,:,:], z_post_t_1], dim=-1), self.z_size,
activation="tanh")
z_post_t, z_post_pdf = self._z(h_z_post_t, is_prior=False)
kl_t = kl_divergence(z_post_pdf, z_prior_pdf)
# Store results
z_prior[t] = z_prior_t
z_post[t] = z_post_t
kl[t] = kl_t
# Transpose back to batch-first format
h_s = h_s.permute(1, 0, 2) # [batch_size, max_n_days, h_size]
z_prior = z_prior.permute(1, 0, 2) # [batch_size, max_n_days, z_size]
z_post = z_post.permute(1, 0, 2) # [batch_size, max_n_days, z_size]
kl = kl.permute(1, 0, 2) # [batch_size, max_n_days, z_size]
x = x.permute(1, 0, 2)
self.kl = kl.sum(dim=-1) # [batch_size, max_n_days]
# Compute g and y
self.g = self._linear(torch.cat([h_s, z_post], dim=-1), self.g_size, activation="tanh")
self.y = self._linear(self.g, self.y_size, activation="softmax")
# Extract the final g_T and y_T
sample_index = torch.arange(self.batch_size, device=x.device).unsqueeze(1)
self.indexed_T = torch.cat([sample_index, (self.T_ph - 1).unsqueeze(1)], dim=1)
# self.g_T = self.g[torch.arange(self.batch_size), self.T_ph - 1]
# self.y_T = self.y[torch.arange(self.batch_size), self.T_ph - 1] if not self.daily_att else None
def infer_func():
# g_T = self.g.gather(1, self.indexed_T.unsqueeze(-1).expand(-1, -1, self.g.size(-1))) # batch_size * g_size
g_T = self.g[torch.arange(self.batch_size), self.T_ph - 1]
# print("g_T shape", g_T.shape)
if not self.daily_att:
y_T = self.y[torch.arange(self.batch_size), self.T_ph - 1] # batch_size * y_size
return g_T, y_T
return g_T
def gen_func():
# Use prior for g
# assert max(self.T_ph-1) <= 5, f"{self.T_ph}"
z_prior_T = z_prior[torch.arange(self.batch_size), self.T_ph - 1] # batch_size * z_size
h_s_T = h_s[torch.arange(self.batch_size), self.T_ph - 1]
x_T = x[torch.arange(self.batch_size), self.T_ph - 1]
g_T = self._linear(torch.cat([x_T, h_s_T, z_prior_T], dim=-1), self.g_size)
if not self.daily_att:
y_T = F.softmax(self._linear(g_T, self.y_size), dim=-1)
return g_T, y_T
return g_T
self.g_T = infer_func() if self.is_training_phase else gen_func()
def _create_discriminative_vmd(self):
"""
Create a discriminative movement decoder.
x: [batch_size, max_n_days, vmd_in_size]
=> vmd_h: [batch_size, max_n_days, vmd_h_size]
=> z: [batch_size, max_n_days, vmd_z_size]
=> y: [batch_size, max_n_days, 2]
"""
# Dropout on input
x = F.dropout(self.x, p=self.dropout_vmd_in)
x = x.permute(1, 0, 2) # max_n_days * batch_size * x_size
# Mask for auxiliary trading days
self.mask_aux_trading_days = torch.arange(self.max_n_days, device=self.T_ph.device).expand(len(self.T_ph),
self.max_n_days) < (
self.T_ph - 1).unsqueeze(1)
# Initialize RNN
if self.vmd_cell_type == 'ln-lstm':
rnn = nn.LSTM(self.h_size, self.h_size, batch_first=True)
else:
rnn = nn.GRU(self.h_size, self.h_size, batch_first=True)
# Forward RNN
packed_input = nn.utils.rnn.pack_padded_sequence(x, self.T_ph.cpu(), batch_first=True, enforce_sorted=False)
h_s_packed, _ = rnn(packed_input)
h_s, _ = nn.utils.rnn.pad_packed_sequence(h_s_packed, batch_first=True)
# Transpose for time-step-wise processing
x = x.permute(1, 0, 2) # [max_n_days, batch_size, x_size]
h_s = h_s.permute(1, 0, 2) # [max_n_days, batch_size, h_size]
# Initialize z tensor
z = torch.zeros(self.max_n_days, self.batch_size, self.z_size, device=x.device)
# Loop through each time step
for t in range(self.max_n_days):
if t == 0:
z_t_1 = torch.randn(self.batch_size, self.z_size, device=x.device) # Random initialization
else:
z_t_1 = z[t - 1]
# Compute h_z and z
h_z_t = self._linear(torch.cat([x[t], h_s[t], z_t_1], dim=-1), self.z_size, activation="tanh")
z_t = self._linear(h_z_t, self.z_size, activation="tanh")
# Store z_t
z[t] = z_t
# Transpose back to batch-first format
h_s = h_s.permute(1, 0, 2) # [batch_size, max_n_days, h_size]
z = z.permute(1, 0, 2) # [batch_size, max_n_days, z_size]
# Compute g and y
self.g = self._linear(torch.cat([h_s, z], dim=-1), self.g_size, activation="tanh")
self.y = self._linear(self.g, self.y_size, activation="softmax")
# Extract g_T
self.g_T = self.g[torch.arange(self.batch_size, device=x.device), self.T_ph - 1]
def forward(self, x_, y_batch, T):
self.x = x_
self.y_ph = y_batch
self.T_ph = T
self.batch_size, self.max_n_days, self.x_size = self.x.shape
if self.variant_type == 'discriminative':
self._create_discriminative_vmd()
else:
if self.vmd_rec == 'h':
self._create_vmd_with_h_rec()
else:
self._create_vmd_with_zh_rec()
return self.g, self.g_T, self.y, self.kl, self.T_ph, self.mask_aux_trading_days
時間的補助モジュール (ATA): 時系列データの損失を統合し、モデルの学習を強化。
ATA層では、入力された $[g_1, g_2, \dots, g_{T-1}]$とその投影$[y_1, y_2, \dots, y_{T-1}]$を使用して、情報スコアと依存スコアを算出します。そして、それらに要素ごとのドット積(点積)を適用することでアテンシメカニズムの処理を行います。このプロセスにより、異なる全体的な依存重みを持つことが可能になります。各ウィンドウ内の最後のタイムステップ$𝑔_𝑇$は、主目標である 𝑦𝑇を算出するために使用され、この場合アテンシメカニズムは使用されません。
import torch
import torch.nn as nn
import torch.nn.functional as F
from ConfigLoader import *
class ATA(nn.Module):
def __init__(self):
super(ATA, self).__init__()
self.g_size = config_model['g_size'] # latent state g dimension
self.y_size = config_model['y_size'] # label dimension
self.alpha = config_model['alpha'] # coefficient controling attentive temporal auxiliary
#kl annealing setting
self.kl_lambda_anneal_rate = config_model['kl_lambda_anneal_rate']
self.kl_lambda_start_step = config_model['kl_lambda_start_step']
self.use_constant_kl_lambda = config_model['use_constant_kl_lambda']
self.constant_kl_lambda = config_model['constant_kl_lambda']
#whether using attention
self.daily_att = config_model['daily_att']
#Which model use in VMD: generative or discriminative
self.variant_type = config_model['variant_type']
# Layers for temporal attention
self.linear_v_i = nn.Linear(self.g_size, self.g_size, bias=False)
self.linear_v_d = nn.Linear(self.g_size, self.g_size, bias=False)
self.attention_weight = nn.Parameter(torch.Tensor(self.g_size, 1))
nn.init.xavier_uniform_(self.attention_weight)
# Layers for generating final predictions
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.linear_final = nn.Linear(self.g_size, self.y_size)
self.softmax = nn.Softmax(dim=-1)
def temporal_attention(self, g, g_T, mask_aux_trading_days, y):
"""
Temporal attention mechanism.
Args:
g: Tensor of shape [batch_size, max_n_days, g_size].
g_T: Tensor of shape [batch_size, g_size].
mask_aux_trading_days: Mask for auxiliary trading days [batch_size, max_n_days].
Returns:
v_stared: Attention scores [batch_size, max_n_days].
att_c: Context vector [batch_size, g_size].
"""
proj_i = torch.tanh(self.linear_v_i(g)) # [batch_size, max_n_days, g_size]
v_i = torch.matmul(proj_i, self.attention_weight).squeeze(-1) # [batch_size, max_n_days]
proj_d = torch.tanh(self.linear_v_d(g)) # [batch_size, max_n_days, g_size]
g_T = g_T.unsqueeze(-1) # [batch_size, g_size, 1]
v_d = torch.matmul(proj_d, g_T).squeeze(-1) # [batch_size, max_n_days]
aux_score = v_i * v_d # [batch_size, max_n_days]
aux_score = torch.where(mask_aux_trading_days, aux_score, torch.full_like(aux_score, float('-inf')))
v_stared = F.softmax(aux_score, dim=-1) # [batch_size, max_n_days]
v_stared = torch.nan_to_num(v_stared, nan=0.0)
context = g.permute(0, 2, 1) if self.daily_att != 'y' else y.permute(0, 2, 1) # Context based on g or y
# print(context.shape)
# print(v_stared.unsqueeze(1).permute(0, 2, 1).shape)
att_c = torch.matmul(context, v_stared.unsqueeze(1).permute(0, 2, 1)).squeeze(2) # [batch_size, g_size]
return v_stared, att_c
def generative_loss(self, y_ph, y, kl, v_stared, y_T, T_ph, kl_lambda):
"""
Compute the generative loss.
Args:
y_ph: Ground truth labels [batch_size, max_n_days, y_size].
y: Predicted labels [batch_size, max_n_days, y_size].
kl: KL divergence [batch_size, max_n_days].
v_stared: Attention scores [batch_size, max_n_days].
g_T: Final latent state [batch_size, g_size].
y_T: Final predicted label [batch_size, y_size].
indexed_T: Indices of the final timestep [batch_size].
kl_lambda: Weight for KL divergence.
Returns:
loss: Scalar loss value.
"""
likelihood_aux = (y_ph * torch.log(y + 1e-7)).sum(dim=-1) # [batch_size, max_n_days]
obj_aux = likelihood_aux - kl_lambda * kl # [batch_size, max_n_days]
# print("y_ph shape:", y_ph.shape) #
# print("self.batch_size:", self.batch_size)
# print("y_ph:", y_ph) #
# likelihood_T = (y_ph.gather(1, indexed_T.unsqueeze(-1).expand(-1, -1, self.y_size)) *
# torch.log(y_T + 1e-7)).sum(dim=-1) # [batch_size]
likelihood_T = (y_ph[torch.arange(self.batch_size), T_ph - 1] *
torch.log(y_T + 1e-7)).sum(dim=-1) # [batch_size]
# kl_T = kl.gather(1, indexed_T.unsqueeze(-1)).squeeze(-1) # [batch_size]
kl_T = kl[torch.arange(self.batch_size), T_ph - 1]
obj_T = likelihood_T - kl_lambda * kl_T # [batch_size]
v_aux = self.alpha * v_stared # [batch_size, max_n_days]
obj = obj_T + (obj_aux * v_aux).sum(dim=-1) # [batch_size]
loss = -obj.mean() # Scalar loss value
return loss
def discriminative_loss(self, y_ph, y, v_stared, y_T, T_ph):
"""
Compute the discriminative loss.
Args:
y_ph: Ground truth labels [batch_size, max_n_days, y_size].
y: Predicted labels [batch_size, max_n_days, y_size].
v_stared: Attention scores [batch_size, max_n_days].
y_T: Final predicted label [batch_size, y_size].
indexed_T: Indices of the final timestep [batch_size].
Returns:
loss: Scalar loss value.
"""
likelihood_aux = (y_ph * torch.log(y + 1e-7)).sum(dim=-1) # [batch_size, max_n_days]
likelihood_T = (y_ph[torch.arange(self.batch_size), T_ph - 1] *
torch.log(y_T + 1e-7)).sum(dim=-1) # [batch_size]
v_aux = self.alpha * v_stared # [batch_size, max_n_days]
obj = likelihood_T + (likelihood_aux * v_aux).sum(dim=-1) # [batch_size]
loss = -obj.mean() # Scalar loss value
return loss
def kl_lambda(self, global_step):
"""
Compute the KL divergence weight.
"""
if global_step < self.kl_lambda_start_step:
return 0.0
if self.use_constant_kl_lambda:
return self.constant_kl_lambda
return min(self.kl_lambda_anneal_rate * global_step, 1.0)
def forward(self, g, g_T, y_ph, y, kl, mask_aux_trading_days, T_ph, global_step):
"""
Forward pass for training.
"""
self.batch_size = T_ph.shape[0]
v_stared, att_c = self.temporal_attention(g, g_T, mask_aux_trading_days, y)
kl_lambda = self.kl_lambda(global_step)
conbin_att = torch.cat((att_c, g_T), dim=-1).to(self.device)
self.linear_y_T = nn.Linear(conbin_att.shape[-1], self.y_size).to(self.device)
y_T = self.softmax(self.linear_y_T(conbin_att))
# print(indexed_T)
if self.variant_type == 'discriminative':
loss = self.discriminative_loss(y_ph, y, v_stared, y_T, T_ph)
else:
loss = self.generative_loss(y_ph, y, kl, v_stared, y_T, T_ph, kl_lambda)
return loss
モデル整合: 上記三つのコンポーネントを一つのモデルにします。
import torch
from Model_MIE import MIE
from Model_VMD import VMD
from Model_ATA import ATA
import torch.nn as nn
class Model(nn.Module):
def __init__(self, word_table_init):
super(Model, self).__init__()
self.word_table_init = word_table_init
self.model_name = "StockNet"
self.global_step = torch.tensor(0, dtype=torch.int32)
self.tf_graph_path = "../log/train"
self.tf_checkpoint_file_path = "../checkpoint/3.pth"
self.n_epochs = 100
# Initilizaze three sub-models
self.mie = MIE(self.word_table_init)
self.vmd = VMD()
self.ata = ATA()
def forward(self, inputs, is_training_phase):
# check input
assert isinstance(inputs, dict)
assert isinstance(is_training_phase, bool)
# MIE
corpu_emd = self.mie(inputs['word_batch'])
mie_output = torch.cat((inputs['price_batch'], corpu_emd), dim=2)
self.vmd.is_training_phase = is_training_phase
# VMD
g, g_T, y_pred, kl, T_ph, mask_aux_trading_days = self.vmd(
mie_output, inputs['y_batch'], inputs['T_batch']
)
# ATA
loss = self.ata(g, g_T, inputs['y_batch'], y_pred, kl, mask_aux_trading_days, T_ph, self.global_step)
# update global steps
self.global_step += 1
return inputs['y_batch'], y_pred, loss
結果とディスカッション
原論文では、評価指標としてaccとmccが使用されています。過去の価格データとテキストデータを用い、時系列補助付き生成モデルを採用した際、そのaccとmccは当時のSOTAに達し、それぞれ58.23と0.08でした。しかし、今回再構築したコードの結果は、accが40.38、mccが0.07となりました。
私は元の結果を再現することができず、その原因は2つあると考えています。1つ目は、モデルのアーキテクチャが同じであっても、異なる第三者ライブラリ(PyTorch)を使用することで、内部の実装方法が結果に影響を及ぼすことです。また、通常バージョンが更新されるにつれ、古いバージョンと新しいバージョンでは訓練結果が異なる場合もあります。2つ目は、原論文のコードにランダムシード(random seed)が定義されておらず、実行するたびに結果が一致しないことです。
外的要因を無視し、モデルアーキテクチャ自体を考察すると、このモデルの最大の特徴は損失関数にアテンションメカニズムを導入し、通常の時系列予測生成モデルでは重視されないlatent stateの$t<T$部分に着目して情報の全体的な活用を向上させた点です。
しかし、本質的にはテキスト情報を全体として処理しており、株価予測においては、テキスト情報が株価の時系列データよりも多くの情報を含んでいる一方で、その粒度が大きく、ノイズも多い傾向があります。実際、論文では純粋なテキスト予測と過去の価格データ+テキスト予測のacc精度が同じであることが示されています。つまり、テキストデータを追加しても精度の大きな向上には繋がっていないと考えられます。これは、テキストデータ内の情報をより巧みに活用できていないことが原因と推測されます。この課題に関連して、派生した手法がイベント駆動型の時系列予測です。これは、テキストからイベントを抽出して予測を行う手法です。例えばEDTモデル(Zhou et. al., 2021)は一時的にaccを75.67まで引き上げました。しかし、別の論文(Meiyun Wang et. al., 2024)では、EDTモデルはStockNetデータセット上での性能が低く、accはわずか40にとどまりました。このことから、イベント駆動型のテキストデータ融合が株価予測において汎用性を持つかどうかを判断するのは難しいと言わざるを得ません。
最近の研究によります(Meiyun Wang et. al., 2024)と、大規模言語モデル(LLM)を活用したテキスト融合型の株価動向予測は良好な結果を示しており、これらはプロンプトエンジニアリングに基づく手法です。しかし、これらの手法も過去の株価データを融合しておらず、本質的には純粋なテキスト予測です。私は、過去の株価データとテキストデータを用いて予測する際、テキストが過去の株価変動をうまく補正できることを期待していますが、その実現にはまだ長い道のりがあると感じています。