import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(BASE_DIR)) from concurrent import futures from abtest import user_reco_pb2 from abtest import user_reco_pb2_grpc from setting.default import DefaultConfig, RAParam from server.reco_centor import RecoCenter import grpc import time import json import hashlib def add_track(res, temp): """ 封装埋点参数 :param res: 推荐文章id列表 :param cb: 合并参数 :param rpc_param: rpc参数 :return: 埋点参数 文章列表参数 单文章参数 """ # 添加埋点参数 track = {} # 准备曝光参数 # 全部字符串形式提供,在hive端不会解析问题 _exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res), "algorithmCombine": temp.algo} track['param'] = json.dumps(_exposure) track['recommends'] = [] # 准备其它点击参数 for _id in res: # 构造字典 _dic = {} _dic['article_id'] = _id _dic['param'] = {} # 准备click参数 _p = {"action": "click", "userId": temp.user_id, "articleId": str(_id), "algorithmCombine": temp.algo} _dic['param']['click'] = json.dumps(_p) # 准备collect参数 _p["action"] = 'collect' _dic['param']['collect'] = json.dumps(_p) # 准备share参数 _p["action"] = 'share' _dic['param']['share'] = json.dumps(_p) # 准备detentionTime参数 _p["action"] = 'read' _dic['param']['read'] = json.dumps(_p) track['recommends'].append(_dic) track['timestamp'] = temp.time_stamp return track def feed_recommend(user_id, channel_id, article_num, time_stamp): """ 1、 实验中心流量切分 2、调用推荐中心,给不同用户按照不同算法推荐结果 3、推荐结果(曝光申请新文章)与用户申请参数进行埋点参数封装 :param user_id: :param channel_id: :param article_num: :param time_stamp: :return: """ class TempParam(object): user_id = -10 channel_id = -10 article_num = -10 time_stamp = -10 algo = "" temp = TempParam() temp.user_id = user_id temp.channel_id = channel_id temp.article_num = article_num # 请求的时间戳大小 temp.time_stamp = time_stamp # 对用户进行判断 if temp.user_id == "": temp.algo = "" return add_track([], temp) # 进行对用户ID分流 bucket = hashlib.md5(temp.user_id.encode()).hexdigest()[:1] if bucket in RAParam.BYPASS[0]['Bucket']: temp.algo = RAParam.BYPASS[0]['Strategy'] else: temp.algo = RAParam.BYPASS[1]['Strategy'] # 2、根据用户ID,channel_id, 用户分配的算法进行召回读取和排序 track = RecoCenter().feed_recommend_logic(temp) # 3、根据推荐结果封装埋点参数 # track = add_track([], temp) return track class UserRecommendServicer(user_reco_pb2_grpc.UserRecommendServicer): """实现推荐接口的调用逻辑 """ def user_recommend(self, request, context): """用于feed流的推荐 :param request: :param context: :return: """ # 解析出用户请求的参数 # string user_id = 1; # int32 channel_id = 2; # int32 article_num = 3; # int64 time_stamp = 4; user_id = request.user_id channel_id = request.channel_id article_num = request.article_num time_stamp = request.time_stamp # # 埋点参数参考: # # { # # "param": '{"action": "exposure", "userId": 1, "articleId": [1,2,3,4], "algorithmCombine": "c1"}', # # "recommends": [ # # {"article_id": 1, "param": {"click": # "{"action": "click", "userId": "1", "articleId": 1, # "algorithmCombine": 'c1'}", "collect": "", "share": "","read":""}}, # ... # # ] # # "timestamp": 1546391572 # # } # 根据参数进行推荐,返回空 1 [1,2,3,4],把推荐的结果封装成标准的埋点参数接口返回给web # 进入实验室中流量切分,分配用户不同算法 _track = feed_recommend(user_id, channel_id, article_num, time_stamp) # 直接返回WEb参数结果,proto协议定义好的格式 # 从后往前的顺序 # _param1参数封装 # [user_reco_pb2.param1(article_id=, params=), user_reco_pb2.param1(article_id=, params=)] _recommend = [] for _ in _track['recommends']: _param2 = user_reco_pb2.param2(click=_['param']['click'], collect=_['param']['collect'], share=_['param']['share'], read=_['param']['read']) _param1 = user_reco_pb2.param1(article_id=_['article_id'], params=_param2) _recommend.append(_param1) return user_reco_pb2.Track(exposure=_track['param'], recommends=_recommend, time_stamp=_track['timestamp']) # def article_recommend(self, request, context): # """ # 猜你喜欢 接口 # :param request: # :param context: # :return: # """ # # return None def serve(): import setting.logging as lg lg.create_logger() # 多线程服务器 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 注册本地服务 user_reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendServicer(), server) # 监听端口 server.add_insecure_port(DefaultConfig.RPC_SERVER) # 开始接收请求进行服务 server.start() # 使用 ctrl+c 可以退出服务 _ONE_DAY_IN_SECONDS = 60 * 60 * 24 try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': # 测试grpc服务 serve()