TensorFlow框架案例实战-第5课-循环神经网络与应用


第5课 循环神经网络与应用

word embedding/词嵌入

skip-gram 模型

class SkipGramModel:
    """ 初始化成员变量 """
    def __init__(self, vocab_size, embed_size, batch_size, num_sampled, learning_rate):
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.batch_size = batch_size
        self.num_sampled = num_sampled
        self.lr = learning_rate
        self.global_step = tf.Variable(0, dtype=tf.int32, trainable=False, name='global_step')

    def _create_placeholders(self):
        """ 定义容易用于存储数据 """
        with tf.name_scope("data"):
            self.center_words = tf.placeholder(tf.int32, shape=[self.batch_size], name='center_words')
            self.target_words = tf.placeholder(tf.int32, shape=[self.batch_size, 1], name='target_words')

    def _create_embedding(self):
        """ 设置权重(注意word2vec里实际上这个权重矩阵里,每一行(或者列)就是我们的词向量,所以很重要) """
        # 在CPU上跑
        with tf.device('/cpu:0'):
            with tf.name_scope("embed"):
                self.embed_matrix = tf.Variable(tf.random_uniform([self.vocab_size, 
                                                                    self.embed_size], -1.0, 1.0),
                                                                    name='embed_matrix')

    def _create_loss(self):
        """ 定义word2vec结果,同时定义损失函数 """
        with tf.device('/cpu:0'):
            with tf.name_scope("loss"):
                # 其实就是一个简单的查表
                embed = tf.nn.embedding_lookup(self.embed_matrix, self.center_words, name='embed')

                # 定义损失函数,通常词表很大,我们会用层次化softmax或者negative sampling
                nce_weight = tf.Variable(tf.truncated_normal([self.vocab_size, self.embed_size],
                                                            stddev=1.0 / (self.embed_size ** 0.5)),
                                                            name='nce_weight')
                nce_bias = tf.Variable(tf.zeros([VOCAB_SIZE]), name='nce_bias')

                self.loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weight,
                                                    biases=nce_bias,
                                                    labels=self.target_words,
                                                    inputs=embed,
                                                    num_sampled=self.num_sampled,
                                                    num_classes=self.vocab_size), name='loss')
    def _create_optimizer(self):
        """ 设定optimizer """
        with tf.device('/cpu:0'):
            self.optimizer = tf.train.GradientDescentOptimizer(self.lr).minimize(self.loss, 
                                                              global_step=self.global_step)

    def _create_summaries(self):
        """ 设定summary,以便在Tensorboard里进行可视化 """
        with tf.name_scope("summaries"):
            tf.summary.scalar("loss", self.loss)
            tf.summary.histogram("histogram loss", self.loss)
            # 好几个summary,所以这里要merge_all
            self.summary_op = tf.summary.merge_all()

    def build_graph(self):
        """ 构建整个图的Graph """
        self._create_placeholders()
        self._create_embedding()
        self._create_loss()
        self._create_optimizer()
        self._create_summaries()


循环神经网络/长短时记忆

LSTM
with tf.Session() as sess:
    sess.run(init)
    writer = tf.summary.FileWriter('./graphs/LSTM_MNIST', sess.graph)
    step = 1
    # 小于指定的总迭代轮数的情况下,一直训练
    while step * batch_size < training_iters:
        batch_x, batch_y = mnist.train.next_batch(batch_size)
        # 对数据形状调整,编程28个28元素的序列
        batch_x = batch_x.reshape((batch_size, n_steps, n_input))
        # 用optimizer进行优化
        sess.run(optimizer, feed_dict={x: batch_x, y: batch_y})
        if step % display_step == 0:
            # 计算batch上的准确率
            acc = sess.run(accuracy, feed_dict={x: batch_x, y: batch_y})
            # 计算batch上的loss
            loss = sess.run(cost, feed_dict={x: batch_x, y: batch_y})
            print("Iter " + str(step*batch_size) + ", Minibatch Loss= " + \
                  "{:.6f}".format(loss) + ", Training Accuracy= " + \
                  "{:.5f}".format(acc))
        step += 1
    print("Optimization Finished!")
    writer.close()

    test_len = 128
    test_data = mnist.test.images[:test_len].reshape((-1, n_steps, n_input))
    test_label = mnist.test.labels[:test_len]
    print("Testing Accuracy:", \
        sess.run(accuracy, feed_dict={x: test_data, y: test_label}))


RNN中文文本分类

import os, sys
import time

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.contrib import learn
from tensorflow.contrib import rnn


class Model():
    def __init__(self, args, deterministic=False):
        self.args = args

        if args.model == 'rnn':
            cell_fn = rnn.BasicRNNCell
        elif args.model == 'gru':
            cell_fn = rnn.GRUCell
        elif args.model == 'lstm':
            cell_fn = rnn.BasicLSTMCell
        else:
            raise Exception('model type not supported: {}'.format(args.model))

        cell = cell_fn(args.rnn_size)

        self.cell = cell = rnn.MultiRNNCell([cell] * args.num_layers)

        self.input_data = tf.placeholder(tf.int64, [None, args.seq_length])
        # self.targets = tf.placeholder(tf.int64, [None, args.seq_length])  # seq2seq model
        self.targets = tf.placeholder(tf.int64, [None, ])  # target is class label

        with tf.variable_scope('embeddingLayer'):
            with tf.device('/cpu:0'):
                W = tf.get_variable('W', [args.vocab_size, args.rnn_size])
                embedded = tf.nn.embedding_lookup(W, self.input_data)

                # shape: (batch_size, seq_length, cell.input_size) => (seq_length, batch_size, cell.input_size)
                inputs = tf.split(embedded, args.seq_length, 1)
                inputs = [tf.squeeze(input_, [1]) for input_ in inputs]

        outputs, last_state = rnn.static_rnn(self.cell, inputs, dtype=tf.float32, scope='rnnLayer')

        with tf.variable_scope('softmaxLayer'):
            softmax_w = tf.get_variable('w', [args.rnn_size, args.label_size])
            softmax_b = tf.get_variable('b', [args.label_size])
            logits = tf.matmul(outputs[-1], softmax_w) + softmax_b
            self.probs = tf.nn.softmax(logits)

        # self.cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=self.targets))  # Softmax loss
        self.cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=self.targets))  # Softmax loss
        self.final_state = last_state
        self.lr = tf.Variable(0.0, trainable=False)
        self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.cost)  # Adam Optimizer

        self.correct_pred = tf.equal(tf.argmax(self.probs, 1), self.targets)
        self.correct_num = tf.reduce_sum(tf.cast(self.correct_pred, tf.float32))
        self.accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))


    def predict_label(self, sess, labels, text):
        x = np.array(text)
        feed = {self.input_data: x}
        probs, state = sess.run([self.probs, self.final_state], feed_dict=feed)

        results = np.argmax(probs, 1)
        id2labels = dict(zip(labels.values(), labels.keys()))
        labels = map(id2labels.get, results)
        return labels


    def predict_class(self, sess, text):
        x = np.array(text)
        feed = {self.input_data: x}
        probs, state = sess.run([self.probs, self.final_state], feed_dict=feed)

        results = np.argmax(probs, 1)
        return results
已邀请:

要回复问题请先登录注册

返回顶部