import tensorflow as tf
from tensorflow.core.framework.embedding import config_pb2
storage_option = tf.StorageOption(storage_type=config_pb2.StorageType.DRAM_SSDHASH,
storage_path="/tmp/ssd_utpy",
storage_size=[512])
ev_opt = tf.EmbeddingVariableOption(storage_option=storage_option)
#通过get_embedding_variable接口使用
emb_var = tf.get_embedding_variable("var", embedding_dim = 16, ev_option=ev_opt)
#通过sparse_column_with_embedding接口使用
from tensorflow.contrib.layers.python.layers import feature_column
emb_var = feature_column.sparse_column_wth_embedding("var", ev_option=ev_opt)
#通过categorical_column_with_embedding接口使用
emb_var = tf.feature_column.categorical_column_with_embedding("var", ev_option=ev_opt)
下面是EmbeddingVariable多级存储的接口定义:
@tf_export(v1=["StorageOption"])
class StorageOption(object):
def __init__(self,
storage_type=None,
storage_path=None,
storage_size=[1024*1024*1024]):
self.storage_type = storage_type
self.storage_path = storage_path
self.storage_size = storage_size
参数解释:
stroage_type:使用的存储类型, 例如DRAM_SSD为使用DRAM和SSD作为embedding的存储,具体支持的存储类型会在第4节中给出
storage_path: 如果使用SSD存储,则需要配置该参数指定保存embedding数据的文件夹路径
storage_size: 指定每个层级可以使用的存储容量,单位是字节,例如对于DRAM+PMem要使用1GB DRAM和 10GB PMem,则配置为[102410241024, 1010241024*1024],默认是每级1GB,目前的实现中无法限制SSD的使用量
3.使用示例
使用get_embedding_variable接口
import tensorflow as tf
from tensorflow.core.framework.embedding import config_pb2
with tf.device('/cpu:0'): #目前只有CPU的EV算子支持使用多级存储
storage_option = tf.StorageOption(storage_type=config_pb2.StorageType.DRAM_SSDHASH,
storage_path="/tmp/ssd_utpy",
storage_size=[128])
ev_opt = tf.EmbeddingVariableOption(storage_option=storage_option)
var = tf.get_embedding_variable("var_0",
embedding_dim=3,
initializer=tf.ones_initializer(tf.float32),
ev_option=ev_opt)
emb = tf.nn.embedding_lookup(var, tf.cast([0,1,2,5,6,7], tf.int64))
fun = tf.multiply(emb, 2.0, name='multiply')
loss = tf.reduce_sum(fun, name='reduce_sum')
opt = tf.train.AdagradOptimizer(0.1)
g_v = opt.compute_gradients(loss)
train_op = opt.apply_gradients(g_v)
init = tf.global_variables_initializer()
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
with tf.Session(config=sess_config) as sess:
sess.run([init])
print(sess.run([emb, train_op, loss]))
print(sess.run([emb, train_op, loss]))
print(sess.run([emb, train_op, loss]))
使用categorical_column_with_embedding接口的例子
import tensorflow as tf
from tensorflow.core.framework.embedding import config_pb2
with tf.device('/cpu:0'): #目前只有CPU的EV算子支持使用多级存储
storage_option = tf.StorageOption(storage_type=config_pb2.StorageType.DRAM_SSDHASH,
storage_path="/tmp/ssd_utpy",
storage_size=[128])
ev_opt = tf.EmbeddingVariableOption(storage_option=storage_option)
columns = tf.feature_column.categorical_column_with_embedding("col_emb", dtype=tf.dtypes.int64, ev_option=ev_opt)
W = tf.feature_column.embedding_column(categorical_column=columns,
dimension=3,
initializer=tf.ones_initializer(tf.dtypes.float32))
ids={}
ids["col_emb"] = tf.SparseTensor(indices=[[0,0],[1,1],[2,2],[3,3],[4,4]], values=tf.cast([1,2,3,4,5], tf.dtypes.int64), dense_shape=[5, 4])
emb = tf.feature_column.input_layer(ids, [W])
fun = tf.multiply(emb, 2.0, name='multiply')
loss = tf.reduce_sum(fun, name='reduce_sum')
opt = tf.train.AdagradOptimizer(0.1)
g_v = opt.compute_gradients(loss)
train_op = opt.apply_gradients(g_v)
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
print("init global done")
print(sess.run([emb, train_op,loss]))
print(sess.run([emb, train_op,loss]))
print(sess.run([emb, train_op,loss]))
EmbeddingVariable相关Op的device 与 Embedding多级存储首级类型必须一致,否则运行时报错。GPU op要求首级为HBM,CPU op要求首级为DRAM。
EmbeddingVariable的HBM单级实现目前暂时只支持部分基础功能。对于特征淘汰、特征准入、特征统计等功能暂未支持。对应的优化器现在提供了Adagrad以及FtrlOptimizer的支持。
以下是各种存储介质的说明:
HBM:GPU显存
DRAM:CPU内存
PMEM:持久化内存
LevelDB:基于LevelDB开发的SSD存储
SSDHASH:基于Hash索引的SSD存储,相比LevelDB实现,有更好的性能和内存稳定性。SSDHASH支持同步和异步两种compaction的方式。使用同步compaction时,向SSD写入数据和compaction将会使用同一个线程,异步时则各使用一个线程。
用户可以通过配置环境变量TF_SSDHASH_ASYNC_COMPACTION
选择使用哪种compaction方式,当TF_SSDHASH_ASYNC_COMPACTION=1时打开异步compaction功能;设置为0或不设置时使用同步compaction。
5.设置淘汰线程数量
为了减少使用多级存储带来的性能开销并且维持系统存储占用量稳定,多级存储会启动后台线程来异步地将数据写入到下级存储中。考虑到在一些场景中(例如在线serving场景)CPU资源紧张,因此多级存储中使用一个统一的线程池来管理系统中所有使用多级存储的EV,用户可以根据实际情况通过配置TF_MULTI_TIER_EV_EVICTION_THREADS
环境变量来设置线程池中的线程数。