记第一次推荐开发实践遇到的问题

type
Post
status
Published
summary
自从入行以来,从来没有重视过内存管理这块的东西,但这次真的是该遭的罪一点没落下的全体验了一遍。试问苍天饶过谁。由于预测时的全量用户为 1.5 亿,数据量实在太大,而且服务器上没有 GPU 资源,所以只能通过开启多进程的方式来进行预测。在 python 中开启多进程无果后,只能想办法曲线救国,最终通过 shell 脚本将数据拆分然后循环调用 python 脚本,这个过程中可以将拆分后的数据的起始行数和数据量通过参数传递给python,最终以这样的方式成功开启了多进程。但随后遇到了另外一个问题:内存溢出导致的部分进程被 kill。
slug
machine-learning-recommender-problem-recode
date
May 24, 2024
tags
机器学习
推荐系统
双塔模型
问题记录
category
机器学习
password
icon
URL
Property
Oct 16, 2024 03:02 AM
本次推荐算法实践,只开发了召回阶段的代码,使用的算法是经典的双塔模型(DSSM)。众所周知,理论知识和实际开发完全是两个不同的东西。在粗略的学习了相关的理论基础知识之后,我就开始投入到开发中了,因此也遇到了很多第一次遇到并且很懵的问题。不过也正是经历过这些问题之后才能感觉到自己的进步。所以将一些问题记录下来,加深记忆和回顾。
本次开发使用的是开源的三方库: Torch-RecHub

算法实践问题

问题一:

在处理数据时,需要将用户数据,物品数据与交互数据相连,得到一个最终的数据,但是由于大部分用户是没有交互数据的,所以连接之后大部分用户的交互相关字段和产品相关的字段会为空。这应该怎么处理?
答:数据源有三个,用户特征、产品特征、用户订购记录,连接的时候并不是将三份数据外链接。订购记录表中的数据可以理解为全是正样本(我们可以给它们新增一个label字段,值全为 1),因此可以直接在这部分数据上拼接上用户特征和产品特征;而负样本需要我们进行筛选和创造(具体的方法后续会介绍),得到负样本之后再拼接负样本的用户特征和产品特征(同样新增一个 label 字段,值全为 0)。 这样我们就得到了完整的训练数据。

问题二:

交互数据中每条数据有用户 ID,产品 ID,是否订购;此时对负样本进行采样,是只针对正样本的中的用户 ID 随机选择负样本产品还是需要对其他非正样本用户选择负样本产品?
答:只针对正样本中的用户 ID 选择负样本产品。如果为正样本中没有出现的用户选择负样本,那么可能会引入一些噪声,因为可能会为那些实际上可能对某些产品感兴趣的用户分配负样本。
只针对正样本中的用户 ID 选择负样本产品的好处:
  • 数据一致性:负样本与正样本来自相同的用户,保持了用户特征的一致性。
  • 减少噪声:为已知用户选择负样本可以减少数据中的噪声,提高模型的预测准确性。
  • 效率:针对已有正样本的用户生成负样本通常比对所有用户进行操作更加高效,因为它减少了需要处理的数据量。

问题三:

召回的测试样本中,需不需要负样本?
  • 如果需要评估召回质量则需要负样本:可以帮助你评估模型是否能有效地区分用户感兴趣和不感兴趣的项目。通过计算如精确率、召回率、F1分数等指标,可以更全面地了解召回模型的性能。
  • 如果只关注召回覆盖率则不一定需要负样本:如果主要目标是测试召回模型是否能覆盖到用户实际感兴趣的项目,那么负样本的存在不是必须的。通过看评分指标的公式即可得知,评估指标在计算时用不到负样本(负样本没什么用,并且只会增大评分指标的分母)(本次采用的方法
  • 实践建议:在实际应用中,包含负样本在测试集中可以提供更全面的评估,尤其是在召回和排序(精排)阶段结合使用的场景下。负样本可以帮助你评估模型是否只是随机召回项目,还是真的学到了用户的偏好。此外,包含负样本也有助于模拟真实场景,因为在实际的推荐系统中,用户会遇到他们不感兴趣的项目。

问题四:

torch-rechub 训练之后如何预测的问题
答:训练过程中torch-rechub库的MatchTrainer.fit 方法会自动将模型保存到指定文件夹,在调用MatchTrainer.inference_embedding方法时,会自动加载模型,只需要将用户特征的数据处理好,喂给模型的MatchTrainer.inference_embedding方法,就会生成对应的用户 embedding,然后直接进行检索就好了。

问题五:

数据变动引发的报错(某个类别特征的类别数量变动,编码超过嵌入维度,预测时配置特征维度与训练时不统一)
  • 问题背景:模型在输入数据时,会分为稀疏特征和稠密特征,稀疏特征会进行编码并嵌入成一个张量,张量的大小为:(特征词典大小:嵌入维度),特征词典大小的意思是这个特征有多少个值,嵌入维度是认为指定的参数,16/32 等。
  • 报错 1:在特征嵌入时会提示:IndexError: index out of range in self
    • 报错信息
      IndexError Traceback (most recent call last) File /Users/ayd/Nutstore Files/坚果云/工作/亚信/项目管理/广东/智能推荐/5-模型开发/dssm_recall_infer_V4.py:305 303 print('\n\n产品和用户开始嵌入成向量! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') 304 # 产品嵌入 --> 305 item_embedding = trainer.inference_embedding(model=model, mode="item", data_loader=x_prdct_dl, model_path=save_dir) 307 # # -------------单进程用户嵌入 308 # x_user_ = df_to_dict(user_features_df) 309 # x_user_dl_set_ = MatchDataGenerator(x=x_user_).dataset (...) 312 # print('=========单进程 >',user_embedding.shape) 313 # -------------多进程用户嵌入 314 start_time_ = time.time() File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/trainers/match_trainer.py:169, in MatchTrainer.inference_embedding(self, model, mode, data_loader, model_path) 167 for i, x_dict in enumerate(tk0): 168 x_dict = {k: v.to(self.device) for k, v in x_dict.items()} --> 169 y_pred = model(x_dict) 170 predicts.append(y_pred.data) 171 return torch.cat(predicts, dim=0) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1511, in Module._wrapped_call_impl(self, *args, **kwargs) 1509 return self._compiled_call_impl(*args, **kwargs) # type: ignore[misc] 1510 else: -> 1511 return self._call_impl(*args, **kwargs) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1520, in Module._call_impl(self, *args, **kwargs) 1515 # If we don't have any hooks, we want to skip the rest of the logic in 1516 # this function, and just call forward. 1517 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks 1518 or _global_backward_pre_hooks or _global_backward_hooks 1519 or _global_forward_hooks or _global_forward_pre_hooks): -> 1520 return forward_call(*args, **kwargs) 1522 try: 1523 result = None File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/models/matching/dssm.py:41, in DSSM.forward(self, x) 39 def forward(self, x): 40 user_embedding = self.user_tower(x) ---> 41 item_embedding = self.item_tower(x) 42 if self.mode == "user": 43 return user_embedding File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/models/matching/dssm.py:63, in DSSM.item_tower(self, x) 61 if self.mode == "user": 62 return None ---> 63 input_item = self.embedding(x, self.item_features, squeeze_dim=True) #[batch_size, num_features*embed_dim] 64 item_embedding = self.item_mlp(input_item) #[batch_size, item_params["dims"][-1]] 65 item_embedding = F.normalize(item_embedding, p=2, dim=1) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1511, in Module._wrapped_call_impl(self, *args, **kwargs) 1509 return self._compiled_call_impl(*args, **kwargs) # type: ignore[misc] 1510 else: -> 1511 return self._call_impl(*args, **kwargs) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1520, in Module._call_impl(self, *args, **kwargs) 1515 # If we don't have any hooks, we want to skip the rest of the logic in 1516 # this function, and just call forward. 1517 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks 1518 or _global_backward_pre_hooks or _global_backward_hooks 1519 or _global_forward_hooks or _global_forward_pre_hooks): -> 1520 return forward_call(*args, **kwargs) 1522 try: 1523 result = None File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/basic/layers.py:70, in EmbeddingLayer.forward(self, x, features, squeeze_dim) 68 if isinstance(fea, SparseFeature): 69 if fea.shared_with == None: ---> 70 sparse_emb.append(self.embed_dict[fea.name](x[fea.name].long()).unsqueeze(1)) 71 else: 72 sparse_emb.append(self.embed_dict[fea.shared_with](x[fea.name].long()).unsqueeze(1)) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1511, in Module._wrapped_call_impl(self, *args, **kwargs) 1509 return self._compiled_call_impl(*args, **kwargs) # type: ignore[misc] 1510 else: -> 1511 return self._call_impl(*args, **kwargs) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:1520, in Module._call_impl(self, *args, **kwargs) 1515 # If we don't have any hooks, we want to skip the rest of the logic in 1516 # this function, and just call forward. 1517 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks 1518 or _global_backward_pre_hooks or _global_backward_hooks 1519 or _global_forward_hooks or _global_forward_pre_hooks): -> 1520 return forward_call(*args, **kwargs) 1522 try: 1523 result = None File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/sparse.py:163, in Embedding.forward(self, input) 162 def forward(self, input: Tensor) -> Tensor: --> 163 return F.embedding( 164 input, self.weight, self.padding_idx, self.max_norm, 165 self.norm_type, self.scale_grad_by_freq, self.sparse) File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/functional.py:2237, in embedding(input, weight, padding_idx, max_norm, norm_type, scale_grad_by_freq, sparse) 2231 # Note [embedding_renorm set_grad_enabled] 2232 # XXX: equivalent to 2233 # with torch.no_grad(): 2234 # torch.embedding_renorm_ 2235 # remove once script supports set_grad_enabled 2236 _no_grad_embedding_renorm_(weight, input, max_norm, norm_type) -> 2237 return torch.embedding(weight, input, padding_idx, scale_grad_by_freq, sparse) IndexError: index out of range in self
       
    • 报错原因1:如果某一个特征 A 的值的个数比训练时的值要多,那么在嵌入这个特征的时候,多出来的编码号码拿去模型对应的嵌入层去检索就会导致检索下标越界
    • 报错原因2:如果某一个特征 A 的值的个数正常(没有超过词典大小),但是其中的某些数值比嵌入维度的最大值要大,也可能会导致这个报错。比如嵌入维度是(n*16),n 是词表大小,如果预测的时候对 A 进行编码时,编码后的部分数值大于 n(即使个数没有超过),则也会导致报此错误。这个时候可能是编码阶段出了问题。
  • 报错 2:在加载模型时会提示:size mismatch for embedding.embed_dict.u_user_star_level.weight: copying a param with shape torch.Size([6, 16]) from checkpoint, the shape in current model is torch.Size([7, 16]).
    • 报错信息
      RuntimeError Traceback (most recent call last) Cell In[23], line 2 1 print("inference embedding") ----> 2 item_embedding = trainer.inference_embedding(model=model, mode="item", data_loader=x_prdct_dl, model_path=save_dir) 3 user_embedding2 = trainer.inference_embedding(model=model, mode="user", data_loader=x_user_dl, model_path=save_dir) 4 # # 召回 File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch_rechub/trainers/match_trainer.py:161, in MatchTrainer.inference_embedding(self, model, mode, data_loader, model_path) 159 assert mode in ["user", "item"], "Invalid mode={}.".format(mode) 160 model.mode = mode --> 161 model.load_state_dict(torch.load(os.path.join(model_path, "model.pth"))) 162 model = model.to(self.device) 163 model.eval() File ~/miniconda3/envs/rec-deploy/lib/python3.8/site-packages/torch/nn/modules/module.py:2153, in Module.load_state_dict(self, state_dict, strict, assign) 2148 error_msgs.insert( 2149 0, 'Missing key(s) in state_dict: {}. '.format( 2150 ', '.join(f'"{k}"' for k in missing_keys))) 2152 if len(error_msgs) > 0: -> 2153 raise RuntimeError('Error(s) in loading state_dict for {}:\n\t{}'.format( 2154 self.__class__.__name__, "\n\t".join(error_msgs))) 2155 return _IncompatibleKeys(missing_keys, unexpected_keys) RuntimeError: Error(s) in loading state_dict for DSSM: size mismatch for embedding.embed_dict.u_user_star_level.weight: copying a param with shape torch.Size([6, 16]) from checkpoint, the shape in current model is torch.Size([7, 16]). size mismatch for embedding.embed_dict.user_id.weight: copying a param with shape torch.Size([42, 16]) from checkpoint, the shape in current model is torch.Size([198, 16]). size mismatch for embedding.embed_dict.u_main_plan_id.weight: copying a param with shape torch.Size([21, 16]) from checkpoint, the shape in current model is torch.Size([41, 16]). size mismatch for embedding.embed_dict.u_prodlist_in_use.weight: copying a param with shape torch.Size([21, 16]) from checkpoint, the shape in current model is torch.Size([41, 16]). size mismatch for embedding.embed_dict.p_product_name.weight: copying a param with shape torch.Size([50, 16]) from checkpoint, the shape in current model is torch.Size([55, 16]). size mismatch for embedding.embed_dict.product_id.weight: copying a param with shape torch.Size([50, 16]) from checkpoint, the shape in current model is torch.Size([55, 16]).
       
    • 报错原因:如果某一个特征 A 的值比训练时的值要少,那么在嵌入这个特征的时候不会报错,但是此时的特征词典大小就与之前训练的时候不一致了,所以嵌入后的特征张量在维度上就与之前训练好的不一致,就会导致尺寸匹配不上
  • 解决方法:这个问题本质上就是训练时特征值与预测数据的特征值有差异(预测时某些特征遇到了新的值或者新数据中少了一些值)最本质的方法就是(在全量数据上)人工处理稀疏特征编码将其保存,在预测阶段使用同样的特征编码。并且特征词典大小也保持一致。
    • 代码如下
      ############ 对类别特征进行编码处理,并保存 ########### feature_max_idx = {} # 记录每个字段的最大索引 feature_encoder = {} # 记录每个字段的编码 feature_decoder = {} # 记录每个字段的编码 sparse_features_original = user_multiClass_cols_original+prod_multiClass_cols_original+['u_is_unlimited_gprs','u_gprs_restrain','p_stackable'] # 後面這三個理論上是布爾值,但是被填充了字符串,歸一化處理不了,會導致後面報錯,所以當作多類別處理 sparse_features_original = [i for i in sparse_features_original if i in list(user_info.columns)+list(prdct_info.columns)] # 过滤一遍,避免错误 # sparse_features_original = [i for i in sparse_features_original if i not in ['user_id','product_id']] # 去除用户和产品 id for feat in sparse_features_original: if feat in list(user_info.columns): user_info[feat] = user_info[feat].fillna('999') # 先填充再编码 feat_encoder = {v:k+1 for k,v in enumerate(list(user_info[feat].unique()))} feat_encoder['unknown'] = len(list(user_info[feat].unique())) + 1 feature_max_idx[feat] = len(list(user_info[feat].unique())) + 2 else: prdct_info[feat] = prdct_info[feat].fillna('999') feat_encoder = {v:k+1 for k,v in enumerate(list(prdct_info[feat].unique()))} feat_encoder['unknown'] = len(list(prdct_info[feat].unique())) + 1 feature_max_idx[feat] = len(list(prdct_info[feat].unique())) + 2 feature_encoder[feat] = feat_encoder feat_decoder = {v:k for k,v in feat_encoder.items()} feature_decoder[feat] = feat_decoder with open('./result/feature_encoder.pkl', 'wb') as file: pickle.dump(feature_encoder, file) with open('./result/feature_decoder.pkl', 'wb') as file: pickle.dump(feature_decoder, file) with open('./result/feature_max_idx.pkl', 'wb') as file: pickle.dump(feature_max_idx, file) ########### 预测时加载并使用 ########### # 加载文件 with open('./result/feature_max_idx.pkl', 'rb') as file: feature_max_idx = pickle.load(file) with open('./result/feature_encoder.pkl', 'rb') as file: feature_encoder = pickle.load(file) with open('./result/feature_decoder.pkl', 'rb') as file: feature_decoder = pickle.load(file) # 稀疏字段处理 sparse_features = user_multiClass_cols+prod_multiClass_cols+['u_is_unlimited_gprs','u_gprs_restrain','p_stackable'] # 後面這三個理論上是布爾值,但是被填充了字符串,歸一化處理不了,會導致後面報錯,所以當作多類別處理 sparse_features = [i for i in sparse_features if i in samples_features_df_columns] # 过滤一遍,避免错误 # print('稀疏特征:',sparse_features) for feat in sparse_features: if feat == 'user_id' or feat == 'product_id': # lbe.classes_的值会随着 lbe.fit_transform 处理的数据而变化,有对应关系;leb.classes_是类属性 lbe = LabelEncoder() samples_features_df[feat] = lbe.fit_transform(samples_features_df[feat]) id_map = {encode_id: raw_id for encode_id, raw_id in enumerate(lbe.classes_)} #encode user id: raw user id else: feat_encoder = feature_encoder[feat] samples_features_df[feat] = samples_features_df[feat].map(lambda x: feat_encoder.get(x, feat_encoder['unknown'])) # 用字典映射 feature_max_idx[feat] = samples_features_df[feat].max() + 1 # 这里会作为词典大小,加1是为未知数据做保留
      此外,由于用户 id 和物品 id 时随时变化的,所以只要求保持单次预测的数量在训练数据的特征字典之内就行,而且需要在预测完之后对其进行还原,所以对这两个字段单独使用LabelEncoder方法来编码

问题六:

Dataloader引发死锁
  • 问题现象:并没有实际报错,但是运行的程序会卡住,一个进程开了几个子进程,子进程在开着但是很快就不工作了,也不占用 cpu 资源
  • 发现问题:通过日志发现,运行输出是卡在训练那一步,结果一直以为是训练那里出了问题;还以为是服务器会限制进程开启子进程的数量,一顿乱找结果毫无进展。后面发现训练并不会在cpu 上开启多进程;发现训练调用的函数里使用了pytorch 的数据加载 Dataloader,其中有个 num_workers 参数会指定加载数据的进程数,最终确定是这里的问题,
  • 解决方法:
    • possible deadlock in dataloader #1355(【pytorch】由dataloader引发的死锁)
    • 将num_workers设置为 0 或者 2 解决问题,我是设置成了 2(设置成 2 可以运行也是需要运气,成功率比较随机)。
    • 或者设置参数persistent_workers=True;persistent_workers (bool, optional) – If True, the data loader will not shutdown the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)(貌似也不行(dataloader在循环里,不知道在没有循环的情况下行不行))

问题七:

在 python 中无法开启多进程(是否成功比较随机),即使开启了多进程之间也是串行关系
  • 暂未解决,不过怀疑是服务器系统的原因(华为欧拉)

问题八:

召回结果的含义,正相关还是负相关
使用向量数据库annoy.query(v=user_emb, n=topk)召回时,返回的是余弦距离而不是余弦相似度(torch_rechub库源码中是这样的),所以最终的结果是在 0 到 2 之间,且值越小越相似;0 表示两个向量在同一方向上完全相同;1 表示两个向量在正交(垂直)方向上无关。;2 表示两个向量在相反方向上完全相反。
而余弦相似度是在 -1 到 1 之间,且值越大越相似。1 表示两个向量在同一方向上完全相同;0 表示两个向量在正交(垂直)方向上无关;-1 表示两个向量在相反方向上完全相反。

问题九:

模型训练在计算损失时,提示预测值不在 0-1 之间
Traceback (most recent call last): File "dssm_recall_train_V6_optimize_20240814.py", line 745, in <module> trainer.fit(train_dl) File "/usr/local/lib/python3.8/site-packages/torch_rechub/trainers/match_trainer.py", line 110, in fit self.train_one_epoch(train_dataloader) File "/usr/local/lib/python3.8/site-packages/torch_rechub/trainers/match_trainer.py", line 83, in train_one_epoch loss = self.criterion(y_pred, y) File "/usr/local/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1511, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1520, in _call_impl return forward_call(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/torch/nn/modules/loss.py", line 618, in forward return F.binary_cross_entropy(input, target, weight=self.weight, reduction=self.reduction) File "/usr/local/lib/python3.8/site-packages/torch/nn/functional.py", line 3127, in binary_cross_entropy return torch._C._nn.binary_cross_entropy(input, target, weight, reduction_enum) RuntimeError: all elements of input should be between 0 and 1
开始怀疑是神经网络激活函数的问题,但是即使激活函数换成了sigmoid 也还是有相同的问题。
于是怀疑是数据问题:
  • 在数据输入模型之前,打印一次数据预览
  • 单独调用模型进行预测,然后打印输出
发现是其中某个字段的值全为 nan,导致最后模型预测的结果也全为 nan,最终导致报错。
依次向上排查到底是数据源有问题还是数据处理之后变为 nan 的。

问题十:Docker 相关问题

  1. 训练程序挂起不运行:在 Docker 容器外面配置定时调用 python 训练程序的时候,训练程序会卡在开始训练的地方一直没有日志输出。只能在容器内调用程序执行;
  1. 在 Docker 外运行预测程序没有文件生成,这是路径的问题,需要将相对路径转换为绝得路径。

问题十一: ’list’ object has no attribute ‘to’

notion imagenotion image
逐层溯源:
  • 出现这个问题的直接原因是:某些字段在进行张量转换的时候没有转换成功,还是list类型
  • 类型转换失败的原因是这个字段中的数据类型有问题(没有转换成数字、数据是混合类型等)
  • 数据类型问题最根本原因是数据处理的时候没有正确处理相应字段(比如某些字段是汉字的是和否,并没有将其处理成0、1)

问题十二:大数据量训练之后,预测的结果中有些产品的所有用户分数一致

可能原因

  • 数据不平衡,影响模型效果
    • 产品的订购数据严重不平衡,最多的产品订购数量2w加,最少的只有1条订购数据,总的600多产品
  • 模型过拟合
    • 模型训练的不够好,导致在嵌入用户特征和产品特征的时候不够有区分度;
  • 向量数据库检索的问题
    • 在召回时使用了annoy向量数据库,n_trees设置为60导致向量计算的精度不够;

尝试操作

  • 向量数据库的n_trees由50增加到100
  • 优化模型参数:防止模型过拟合,增加特征维度以增强表达
    • 学习率:0.01 —— 0.0001
    • L2衰减权重:0.0001 —— 0.001
    • 嵌入纬度:[128, 64] —— [256, 128, 64]

结果

效果变好,但是还是有大量的用户分数相同(召回100万用户,只有6w左右个分数),这应该是向量数据库的精度还不够,但是增大n_trees会导致内存溢出,因此必要的时候需要更换性能更好的向量数据库

内存管理问题

由于预测时的全量用户为 1.5 亿,数据量实在太大,而且服务器上没有 GPU 资源,所以只能通过开启多进程的方式来进行预测。在 python 中开启多进程无果后,只能想办法曲线救国,最终通过 shell 脚本将数据拆分然后循环调用 python 脚本,这个过程中可以将拆分后的数据的起始行数和数据量通过参数传递给python,最终以这样的方式成功开启了多进程。但随后遇到了另外一个问题:内存溢出导致的部分进程被 kill。
经过一系列的尝试:一开始开了 10 个进程,有两个或三个进程被 kill;一开始我还以为是进程开太多了,多个进程一起跑起来内存压力很大所有系统会有 kill 进程的行为。但是后来减少到 9 个,8 个都有不同数量的进程被 kill,最后减少到 5 个进程依然会被 kill;而且通过日志可以发现这些被 kill 的进程都是代码运行到同一个地方的时候被停止了,这就很好定位问题了。
找到这行代码发现,这行代码是在使用 pd.concat 方法合并一个列表中的 dataframe 数据;这列表中有 8 个 df,使用concat方法一起合并的话,需要的瞬时内存峰值会很大,而此时还有其他进程也在运行且需要内存的,所以很有可能是在这里瞬间占用大量内存的时候被系统大大给 kill 了。有了这样的猜想之后就有方向了,既然瞬时内存峰值太高,那我就两两合并试试看,并且在循环中不新增变量来额外增加内存负担。不管是不是先改了运行再说。
# 原来的处理 match_res_df = pd.concat(match_res_df_l, ignore_index=True) # 改进后的处理 match_res_df = match_res_df_l[0] for df in match_res_df_l[1:]: match_res_df = pd.concat([match_res_df, df], ignore_index=True)
此外,还对代码中的其他使用完的中间变量进行了删除,并在合并数据之前调用了python 的垃圾回收机制,来强行回收内存。
del user_info_ del user_info_df_l # 删除中间数据,减少内存占用 import gc gc.collect() # 强制进行垃圾回收
经过以上的改动,多进程成功运行到寿终正寝,终于再没有短命的进程了,它真的我哭死(ㄒoㄒ)。

20241014更新

继续更新过模型参数之后,内存又不够了。此次采取的优化措施:
  • 数据处理和嵌入阶段分批处理,保证内存的使用曲线更平滑,过程中严格控制中间变量的使用和删除,没用的变量立刻清理回收
  • 数据召回阶段分块处理,将数据分成多块,没块数据单独构建向量数据库索引,然后召回相应数量的用户数,每个块处理完之后立刻释放内存
    • 召回阶段分块的好处,不仅能够减轻内存的压力,又能够在召回时减少相同分数的数量(annoy向量数据库的索引逻辑是将数据分布在不同的块区间,在匹配时相同块区间的数据分数相同或相近)。
  • 内存映射内存映射的原理是将数据保存到文件,在使用时分批读取,这样能减轻内存的压力,但是缺点是在某些需要同时使用所有数据的场景行不通(如训练模型,向量数据库构建索引),但是在本方法中,召回阶段是分段的,每段数据单独构建向量数据库的索引,所以可以使用内存映射来处理数据。(理论方法,未实现
  • 加入内存监控
自从入行以来,从来没有重视过内存管理这块的东西,但这次真的是该遭的罪一点没落下的全体验了一遍。试问苍天饶过谁。

GPT回答的问题

召回结果分数一致问题

我开发了一个双塔模型,给出一条产品数据,根据产品数据去召回一定数量的用户;但是最终在推理的时候会出现同一个产品的所有召回用户的评分都是一样的。我怀疑可能的原因是: 1、模型训练的不够好,导致在嵌入用户特征和产品特征的时候不够有区分度; 2、在召回时使用了annoy向量数据库,n_trees设置为60导致向量计算的精度不够;
你觉得我的猜测合理吗?有没有其他可能的原因?我应该怎么优化这些问题?模型参数优化时应该如何设置参数(训练数据有400万,并且其中的数据很不平衡,大约600个产品,85%产品的订购数据不足100条)?更换如果annoy向量数据库不能满足效果,应该更换成哪一种更合适?
💡
你提出的两个原因都有可能导致推理时同一个产品的所有召回用户的评分一样。让我们深入分析并讨论可能的优化方法:
1. 模型训练不够好,嵌入区分度不足
  • 可能性:如果模型在嵌入用户特征和产品特征时没有足够的区分度,可能导致嵌入向量相似性不足。特征表达能力不够强,可能会让不同用户在与相同产品交互时产生相似的评分。这可能是由于以下原因:
    • 训练数据不均衡:大部分产品的订购数据较少,模型可能在这些产品上过拟合,或难以学习到有意义的特征。85%的产品订购数据不足100条,训练时这些样本的权重不足,导致嵌入向量区分度降低。
    • 模型复杂度不足:双塔模型可能需要更高维度的嵌入或更复杂的网络结构来学习更加细致的特征表示。
优化建议
  • 尝试增加嵌入向量的维度,提高模型的特征表达能力。
  • 使用更复杂的模型架构,比如引入注意力机制或层次化的嵌入。
  • 数据不平衡问题可以通过使用 Weighted SamplingFocal Loss 来提高模型对低频样本的敏感度。
  • 考虑使用数据增强,扩充少量数据的产品样本。
2. Annoy向量数据库的n_trees设置影响精度
  • 可能性:Annoy数据库本身是一种基于近似最近邻(ANN)的算法。设置 n_trees=60 可能导致精度不够,召回时未能找到精确的相似向量,导致相同产品的召回用户评分相似。
  • n_trees 参数越大,召回的精度越高,但代价是查询时间增加。
优化建议
  • 增加 n_trees 的值,尝试更高的树数量(如 100 或更高),以提升召回精度。
  • 尝试其他ANN库,比如 Faiss,它在速度和精度之间有很好的平衡,尤其适合高维度向量。
  • 若召回精度要求非常高,可以尝试使用传统的 KNN,虽然速度较慢,但能保证准确的最近邻查找。
3. 其他可能原因
  • 用户与产品特征空间的不一致:如果用户特征和产品特征分布差异太大,可能会影响召回时的匹配质量。可以检查嵌入后的向量分布,确保特征空间合理。
  • 训练数据与测试数据分布差异:如果训练数据中的产品和用户交互模式与实际推理时的情况不同,可能会导致模型的泛化能力不足。可以通过进一步的数据分析和模型验证来判断是否存在数据偏差问题。
参数优化建议
  • 嵌入维度:可以逐步增加嵌入维度,观察模型效果的变化。通常 128 或 256 是比较常见的维度,但你可以根据效果做实验调整。
  • 学习率:对于不平衡数据,可以尝试较小的学习率,减少过拟合的风险。
  • 损失函数:尝试使用基于对比学习的损失函数,比如 Triplet LossContrastive Loss,它们可以有效提升嵌入空间的区分度。
  • Batch Size:适当调整 batch size,使用较大的批次(例如 1024),结合加速器(如 GPU)来提高训练效率。
更换Annoy
如果发现 Annoy 无法满足效果,推荐更换成 Faiss,它是高效的近似最近邻搜索库,尤其在处理大规模向量时表现优秀。同时支持 GPU 加速,可以提高召回速度。
通过模型调整和数据库优化,你应该能够减少相同产品召回用户评分一致的情况。
在训练时设置了Batch Size为512,那么在推理时还是会使用这个值来进行数据分批吗?但是在预测时我进行了多进程处理,预测的时候可能会导致进程被kill,所以应该怎么取一个合适的Batch Size
💡
在推理时,Batch Size 可以与训练时不同,通常可以根据硬件资源和内存限制进行调整。使用合适的 Batch Size 有助于提高推理效率,特别是在多进程处理时。

选择合适的 Batch Size 的建议:

  1. 内存限制:考虑到服务器的内存(700G),可以在推理时进行逐步测试。尝试从小的 Batch Size 开始(如 64 或 128),然后逐渐增大,直到接近内存限制或遇到进程被 kill 的情况。
  1. 资源监控:使用工具监控 GPU 和内存使用情况,确保不会超出限制。可以利用 nvidia-smi 监控 GPU 使用率。
  1. 多进程处理:确保每个进程使用的 Batch Size 不会过大,防止累计内存使用过高。可以尝试将每个进程的 Batch Size 限制在较小范围内,例如 128 或 256。
  1. 动态调整:实现一个动态调整策略,如果发现进程经常被 kill,可以降低 Batch Size 进行推理,确保稳定性。
结合以上建议,找到一个适合你服务器配置的平衡点,以便顺利进行推理。

向量数据库分段召回问题

我现在有3000万个向量想使用annoy向量数据库来做相似度匹配召回,假设对一个查询向量总的需要召回100万个结果,现在有两种方法: 1、将这3000万个向量一起构建数据库索引,然后召回100万数据; 2、将这3000万个向量平均分成10份,每一份数据单独构建索引,然后每一份数据单独召回10万条数据;
这两种方法会不会导致以下结果? 1、由于annoy向量数据库的特性,将数据一起构建索引召回的结果会不会有大量的相同的分数?将数据拆分能不能缓解这种情况?使得分数更多样,如果拆分的份数更多会更好吗? 2、将数据拆分后,内存压力会不会减小? 3、会不会导致其他可能的问题?
💡
关于这两种方法的影响分析如下:
  1. 相同分数的问题
      • 将数据一起构建索引:Annoy 的特性是基于随机投影的近似最近邻搜索。即使索引一起构建,由于使用了多个随机树,结果可能不会有完全相同的分数。不过,如果相似度匹配中的查询向量与部分数据分布过于接近,可能出现评分相近的情况。
      • 将数据拆分后构建多个索引:拆分后每部分独立构建索引可以减少在同一批数据上召回相似分数的概率,因为不同的数据集可能有不同的局部结构,能使召回结果的分数更加多样。不过拆得太细可能导致整体相似性的下降,因为每个子索引覆盖的数据量减少,匹配的全局性降低。
      • 更好的策略:适当的拆分可能有助于缓解相同分数的情况,但过度拆分反而可能导致召回效果下降,建议根据数据的实际分布情况进行尝试和评估。
  1. 内存压力
      • 将数据一起构建索引:一个大的索引会占用更多内存,同时在进行查询时,加载整个索引的数据会对内存产生较大压力。
      • 将数据拆分:拆分可以有效减少每个索引构建和查询时的内存消耗,因为每次只需要加载部分数据进行计算。但在查询时需要对多个索引分别进行召回,查询时间可能会增加(虽然可以并行化处理)。
  1. 其他潜在问题
      • 拆分影响召回的质量:拆分可能导致召回结果的全局性下降,因为每个子索引只包含部分数据,查询结果会局限在局部数据范围内,可能会漏掉更优的相似项。
      • 并行查询的复杂度:需要额外处理多个索引的结果融合,可能增加系统的复杂度。
      • 调优成本:你需要针对不同拆分策略进行实验,找到适合的拆分规模,并可能针对每个子索引调整 n_trees 等参数。
综上,将数据适当拆分有可能在内存和结果分数的多样性上有所提升,但需要平衡拆分的粒度,避免召回效果的下降。
但是Annoy在构建数据索引时不是将数据按区域存储吗?相似的数据存储到一个区域,这样在计算相似度的时候就是计算那个区域的相似度,在召回时就会将分值高的区域内的值都返回,然后这些数据的分值都相同。
我目前的结果就是,召回500万数据,其中不同的分值个数只有25万个,说明平均每个分数对应的数据量有20多条数据
💡
你观察到的情况,确实是 Annoy 在构建索引时的一种特性。Annoy 使用的是多树随机投影技术,每个树的内部节点会基于数据的局部特性分区域存储,因此在搜索时,如果查询向量和一个区域内的多个点都相似,那么这些点的相似度分数会非常接近甚至相同。这种情况在你召回大量结果时特别明显,因为搜索过程通常会集中在最接近的那些区域内。
针对你的具体问题和观察结果,以下是一些可能的解释和建议:
  1. 相同分数的原因
      • 区域集中:Annoy 的多树结构将相似的数据点集中在一起,导致这些区域内的多个点返回相同或接近的分数。当你召回大量结果时,尤其是500万条数据,可能大量结果来自这些高度相似的区域,因此出现了较少的分数类别。
      • 数据本身的分布:如果你的数据本身在高维空间中分布集中(即很多点彼此非常接近),即便 Annoy 进行了多树分割,结果中的相似度也会表现为很多点拥有相同的分数。
  1. 拆分索引是否有帮助
      • 缓解同分情况:拆分索引可能有助于缓解每个区域过于集中的问题,因为每个子索引只处理数据的一部分,可能会减少集中召回特定区域内数据的情况,使得分数分布更为分散。然而,正如之前提到的,拆得过细可能导致全局性召回效果下降,漏掉一些可能的好结果。
      • 调节 n_trees:增加 n_trees 的数量可能会细化 Annoy 对空间的划分,使得数据在更多树中有不同的分布,进而可能增加返回结果的分数种类。
  1. 进一步建议
      • 使用其他向量搜索库:如果相同分数的现象对你的应用有较大影响,可以考虑使用其他近似最近邻搜索算法,比如 FAISS(Facebook 提供的),它提供了更加灵活的参数调优,能够处理更精细的分数差异。
      • 调整查询规模:如果召回的结果集中在相同分数的区域,你可以尝试调整查询规模和 k 值,使得返回的结果更加分散。
结合你的情况,适度拆分索引或调整 Annoy 的 n_trees 参数可能有助于改善召回分数的多样性。你也可以考虑通过实验不同拆分和索引策略,来找到最适合你数据的解决方案。

内存优化问题

💡
在服务器中使用shell脚本循环调用该脚本,开启5个python进程,但是根据日子内容得知,部分进程总是会在嵌入用户数据后被系统kill(不知道是在合并数据时被kill还是在向量数据库召回的时候被kill)。已知每个进程需要处理2500万用户数据,服务器有100个核心,700G内存。我应该如何优化代码以保证进程不被kill?
💡
根据您的描述,我建议采取以下优化措施来减少进程被kill的可能性:
  1. 分批处理数据:
    1. def data_generator(user_info_df, batch_size): for i in range(0, len(user_info_df), batch_size): yield user_info_df[i:i+batch_size] # 在用户数据处理和嵌入部分 for user_info_df_i in user_info_df_l: for user_info_df in data_generator(user_info_df_i, 1000000): # 每次处理100万行数据 # 处理和嵌入逻辑 # ...
  1. 及时释放内存:
    1. import gc # 在处理完每批数据后 del df_i, df_dict_, df_dict_set_, df_dict_dl, user_emb_i gc.collect()
  1. 使用内存映射文件:
    1. import numpy as np # 替换 torch.cat 操作 user_embedding_file = np.memmap('user_embedding.dat', dtype='float32', mode='w+', shape=(total_users, embedding_dim)) # 在循环中更新 user_embedding_file[start_idx:end_idx] = user_emb_i.numpy() # 使用完毕后 del user_embedding_file
      内存映射文件的原理:
      • 内存映射是一种将文件或文件的一部分直接映射到进程的地址空间的技术。
      • 它允许程序像访问内存一样访问文件,而不需要显式的文件I/O操作。
      • 操作系统负责管理实际的文件读写,只在需要时才将数据从磁盘加载到内存中。
  1. 优化向量召回:
    1. def item_to_user_match(user_embedding, item_embedding, test_user, all_item, user_map, item_map, user_col='user_id', item_col='product_id', topk=5000000): # ... 现有代码 ... match_res_list = [] for item_id, item_emb in zip(all_item[item_col], item_embedding): users_idx, users_scores = annoy.query(v=item_emb, n=topk) match_indexToUsers = list(np.vectorize(user_map.get)(test_user[user_col][users_idx])) match_res_list.extend([{'product_id': item_map[item_id], 'user_id': user, 'prob': score} for user, score in zip(match_indexToUsers, users_scores)]) if len(match_res_list) >= 1000000: # 每100万条结果写入一次 match_res_df = pd.DataFrame(match_res_list) match_res_df.to_csv('temp_results.csv', mode='a', header=False, index=False) match_res_list = [] # 处理剩余结果 if match_res_list: match_res_df = pd.DataFrame(match_res_list) match_res_df.to_csv('temp_results.csv', mode='a', header=False, index=False) # 读取最终结果 match_res_df = pd.read_csv('temp_results.csv', names=['product_id', 'user_id', 'prob']) os.remove('temp_results.csv') return match_res_df
  1. 监控内存使用:
    1. import psutil def print_memory_usage(): process = psutil.Process() print(f"内存使用: {process.memory_info().rss / 1024 / 1024:.2f} MB") # 在关键点调用此函数 print_memory_usage()
  1. 考虑使用多进程处理:
    1. import multiprocessing def process_batch(batch_data): # 处理和嵌入逻辑 # ... return user_emb_i if __name__ == '__main__': # ... with multiprocessing.Pool(processes=5) as pool: results = pool.map(process_batch, data_generator(user_info_df, 1000000)) user_embedding = torch.cat(results, dim=0)
这些优化措施应该能够显著减少内存使用,降低进程被kill的风险。如果问题仍然存在,可能需要进一步调整批处理大小或考虑使用分布式处理框架如Dask或PySpark。
对于本文内容有任何疑问, 可与我联系.