import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(BASE_DIR)) import hashlib from setting.default import RAParam from server.utils import HBaseUtils from server import pool from datetime import datetime from server.recall_service import ReadRecall from server.redis_cache import get_cache_from_redis_hbase from server.sort_service import lr_sort_service import logging import json # 排序编号与调用接口映射字典 sort_func = { 200: lr_sort_service } logger = logging.getLogger('recommend') def add_track(res, temp): """ 封装埋点参数 :param res: 推荐文章id列表 :param cb: 合并参数 :param rpc_param: rpc参数 :return: 埋点参数 文章列表参数 单文章参数 """ # 添加埋点参数 track = {} # 准备曝光参数 # 全部字符串形式提供,在hive端不会解析问题 _exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res), "algorithmCombine": temp.algo} track['param'] = json.dumps(_exposure) track['recommends'] = [] # 准备其它点击参数 for _id in res: # 构造字典 _dic = {} _dic['article_id'] = _id _dic['param'] = {} # 准备click参数 _p = {"action": "click", "userId": temp.user_id, "articleId": str(_id), "algorithmCombine": temp.algo} _dic['param']['click'] = json.dumps(_p) # 准备collect参数 _p["action"] = 'collect' _dic['param']['collect'] = json.dumps(_p) # 准备share参数 _p["action"] = 'share' _dic['param']['share'] = json.dumps(_p) # 准备detentionTime参数 _p["action"] = 'read' _dic['param']['read'] = json.dumps(_p) track['recommends'].append(_dic) track['timestamp'] = temp.time_stamp return track class RecoCenter(object): """推荐中心 """ def __init__(self): self.hbu = HBaseUtils(pool) self.recall_server = ReadRecall() def feed_recommend_logic(self, temp): """推荐中心的下拉与上拉加载历史的逻辑处理 :return: """ # 1、读取某请求用户的,某个频道的最近的历史记录结果 # COLUMN CELL # channel:18 timestamp=1561645604557, value=[44657, 15....] try: last_stamp = self.hbu.get_table_row('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(), 'channel:{}'.format(temp.channel_id).encode(), include_timestamp=True)[1] logger.info("{} INFO get user_id:{} channel:{} history last_stamp".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id)) except Exception as e: logger.warning("{} WARN read history recommend exception:{}".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e)) last_stamp = 0 # 2、用户的请求时间戳与历史记录时间戳比较 # 如果历史记录时间戳 < 用户请求时间戳 # 进行正常召回读取,排序推荐 if last_stamp < temp.time_stamp: # # 封装成埋点参数返回 # temp.time_stamp = last_stamp # track = add_track([], temp) # 完成一个完成的召回读取过程,并且排序之后返回 # TODO:读取缓存逻辑 res = get_cache_from_redis_hbase(temp, self.hbu) if not res: logger.info("{} INFO get user_id:{} channel:{} recall/sort data". format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id)) res = self.user_reco_list(temp) temp.time_stamp = last_stamp track = add_track(res, temp) else: logger.info("{} INFO read user_id:{} channel:{} history recommend data".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id)) # 如果历史记录时间戳 >= 用户请求时间戳 # 该用户正在看某个频道某条历史记录 # # 1、如果没有历史数据,返回时间戳0以及结果空列表, 翻到最远的历史,没有更远的历史记录,返回一个上一条0时间戳,返回空 # # 2、正好请求最后一条历史记录,返回最后这一条记录结果,上一条时间戳0 # # 3、请求的时间戳t 之前还有多条历史,返回这个请求时间戳的数据,把上一条的时间戳t上次的时间戳返回 try: row = self.hbu.get_table_cells('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(), 'channel:{}'.format(temp.channel_id).encode(), timestamp=temp.time_stamp + 1, include_timestamp=True) except Exception as e: logger.warning("{} WARN read history recommend exception:{}".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), e)) row = [] res = [] # row是一个多版本数据,没有就为空 # [(value=[18904, 14300, 17454, 14899, 18335], timestamp=1561645550620), # (value=[15140, 16421, 19494, 14381, 17966], timestamp=1561645491955)] if not row: temp.time_stamp = 0 res = [] elif len(row) == 1 and temp.time_stamp == row[0][1]: res = eval(row[0][0]) temp.time_stamp = 0 elif len(row) >= 2: temp.time_stamp = int(row[1][1]) res = eval(row[0][0]) # 返回相应的历史记录,并且还有上一条时间戳 # 封装埋点参数,返回 # 文章id还不是int类型 res = list(map(int, res)) logger.info( "{} INFO history:{}, {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), res, temp.time_stamp)) # temp的某个推荐算法组合名称置位空 temp.algo = '' track = add_track(res, temp) return track def user_reco_list(self, temp): """读取用户分配算法对应召回结果,过滤之后,排序,推荐出去,放入wait_recommend :return: """ # - 1、循环用户被abtest分配算法组合参数,遍历不同召回结果进行过滤, 700 # (1, [100, 101, 102, 103, 104], []) reco_list = [] for _num in RAParam.COMBINE[temp.algo][1]: if _num == 103: res = self.recall_server.read_redis_new_article(temp.channel_id) reco_list = list(set(reco_list).union(set(res))) elif _num == 104: res = self.recall_server.read_redis_hot_article(temp.channel_id, 10) reco_list = list(set(reco_list).union(set(res))) else: res = self.recall_server.read_hbase_recall_data(RAParam.RECALL[_num][0], 'recall:user:{}'.format(temp.user_id).encode(), '{}:{}'.format(RAParam.RECALL[_num][1], temp.channel_id).encode()) reco_list = list(set(reco_list).union(set(res))) # - 2、过滤当前该请求频道推荐历史结果,需要过滤0频道推荐结果,防止出现推荐频道与25个频道有重复推荐, 100篇历史记录 history_list = [] try: data = self.hbu.get_table_cells('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(), 'channel:{}'.format(temp.channel_id).encode()) for _ in data: history_list = list(set(history_list).union(set(eval(_)))) logger.info("{} INFO read user_id:{} channel_id:{} history data".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id)) except Exception as e: # 打印日志 logger.warning( "{} WARN filter history article exception:{}".format(datetime.now(). strftime('%Y-%m-%d %H:%M:%S'), e)) try: # 0频道历史过滤 data = self.hbu.get_table_cells('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(), 'channel:{}'.format(0).encode()) for _ in data: history_list = list(set(history_list).union(set(eval(_)))) logger.info("{} INFO filter user_id:{} channel:{} history data".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, 0)) except Exception as e: logger.warning( "{} WARN filter history article exception:{}".format(datetime.now(). strftime('%Y-%m-%d %H:%M:%S'), e)) # 过滤操作 # 文章ID做int类型转换 reco_list = list(map(int, reco_list)) history_list = list(map(int, history_list)) reco_set = list(set(reco_list).difference(set(history_list))) # TODO:排序过程 # 对召回的上千篇文章进行一次精选(排序过程) # 预测temp.user_id对于 reco_set每一篇点击率的预估 # 排序之后的TOPK个会推荐给用户,到缓存里面 # 2 用户, [16437, 18743, 44090, 18238,.....14585, 18172, 44541] # 取出CTR 点击率高的TOPK个文章推荐给用户 reco_set = sort_func[RAParam.COMBINE[temp.algo][2][0]](reco_set, temp, self.hbu) logger.info("{} INFO after filter user_id:{} reco_set:{}".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, reco_set)) # - 3、真正应该推荐给用户的文章(排序), 600篇 # - 4、600篇, 10条直接推出去grpc返回出去,590条 # - 待推荐的wait_recommend表当中 if not reco_set: return reco_set else: # 如果过滤之后,数量小于需要给后台数量 if len(reco_set) <= temp.article_num: res = reco_set else: # 数量大于web后台要获取的推荐数量article_num res = reco_set[:temp.article_num] logger.info( "{} INFO put user_id:{} channel:{} wait data".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id)) # 剩下的文章写入待推荐缓存结果 self.hbu.get_table_put('wait_recommend', 'reco:{}'.format(temp.user_id).encode(), 'channel:{}'.format(temp.channel_id).encode(), str(reco_set[temp.article_num:]).encode(), timestamp=temp.time_stamp) # 要推荐出去的res,直接写入历史推荐记录,然后返回给推荐中心 self.hbu.get_table_put('history_recommend', 'reco:his:{}'.format(temp.user_id).encode(), 'channel:{}'.format(temp.channel_id).encode(), str(res).encode(), timestamp=temp.time_stamp) # 放入历史记录日志 logger.info( "{} INFO store recall/sorted user_id:{} channel:{} history_recommend data".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), temp.user_id, temp.channel_id)) return res