加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
make_train_data.py 4.21 KB
一键复制 编辑 原始数据 按行查看 历史
清真奶片 提交于 2022-05-04 04:13 . hive join rename
# -*- coding: UTF-8 -*-
import os
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
os.environ['JAVA_HOME'] = '/usr/java/jdk1.8.0_181-cloudera'
spark_session = SparkSession.builder.master("local[*]").appName("hive_test_1") \
.config("hive.metastore.uris", "thrift://127.0.0.1:9083") \
.enableHiveSupport().getOrCreate()
import time
def get_jd_sql():
jdsql= """
select rcn_prod.t_jd.id , rcn_prod.t_jd.jd_content_id , rcn_prod.t_jd_content.title , rcn_prod.t_jd_content.company_name , rcn_prod.t_jd_content.work_age , rcn_prod.t_jd_content.workplaces , rcn_prod.t_jd_content.recruit_number , rcn_prod.t_jd_content.min_annual_salary , rcn_prod.t_jd_content.max_annual_salary , rcn_prod.t_company.name as t_companyname , rcn_prod.t_delivery_project.degree , rcn_prod.t_delivery_project.work_age as workage from rcn_prod.t_jd , rcn_prod.t_jd_content , rcn_prod.t_company , rcn_prod.t_delivery_project
where rcn_prod.t_jd.jd_content_id = rcn_prod.t_jd_content.id and rcn_prod.t_company.id = rcn_prod.t_jd_content.company_id and rcn_prod.t_delivery_project.id = rcn_prod.t_jd.main_project_id
"""
return jdsql
def get_cv_sql():
cvsql = """
select
rcn_prod.t_talent_resume.id as id , rcn_prod.t_talent_resume.name , rcn_prod.t_talent_resume.date_of_birth , rcn_prod.t_talent_resume.gender , rcn_prod.t_talent_resume.age , rcn_prod.t_talent_resume.marital_status , rcn_prod.t_talent_resume.work_year , rcn_prod.t_talent_resume.current_company , rcn_prod.t_talent_resume.current_position , rcn_prod.t_talent_resume.current_salary , rcn_prod.t_talent_resume.industry ,
rcn_prod.t_school.name as school_name , rcn_prod.t_school.id as school_id ,
rcn_prod.t_major.name as major_name , rcn_prod.t_major.id as major_id , rcn_prod.t_work_status.name as status_name , rcn_prod.t_work_status.id as status_id ,
rcn_prod.t_talent_resume.created_at
from rcn_prod.t_talent_resume , rcn_prod.t_school , rcn_prod.t_major , rcn_prod.t_work_status
WHERE
rcn_prod.t_school.id = rcn_prod.t_talent_resume.school_id AND
rcn_prod.t_major.id = rcn_prod.t_talent_resume.major_id AND
rcn_prod.t_work_status.id = rcn_prod.t_talent_resume.work_status_id
order by rcn_prod.t_talent_resume.created_at DESC
"""
return cvsql
def get_order_sql():
orderSql = """
(SELECT
id,
jd_id,
resume_id,1 as label
FROM
rcn_prod.t_delivery_order
WHERE EXISTS ( SELECT 1 FROM rcn_prod.t_delivery_order_operation WHERE rcn_prod.t_delivery_order.id = rcn_prod.t_delivery_order_operation.order_id AND rcn_prod.t_delivery_order_operation.operation_type = 401 )
)UNION
(SELECT
id,
jd_id,
resume_id,0 as label
FROM
rcn_prod.t_delivery_order
WHERE EXISTS ( SELECT 1 FROM rcn_prod.t_delivery_order_operation WHERE rcn_prod.t_delivery_order.id = rcn_prod.t_delivery_order_operation.order_id AND rcn_prod.t_delivery_order_operation.operation_type = 402 )
)
"""
return orderSql
jdsql = get_jd_sql()
cv_sql = get_cv_sql()
order_sql = get_order_sql()
s = time.time()
def create_hive(dataframe,db,table):
dataframe.write.format("hive").mode("overwrite").saveAsTable('{}.{}'.format(db, table))
jd = spark_session.sql(jdsql)
cv = spark_session.sql(cv_sql)
order = spark_session.sql(order_sql).withColumnRenamed('id','order_id')
jd = jd.withColumnRenamed('id','outer_jd_id')
create_hive(jd,'my_test','rcn_jd_data')
relation = order['jd_id']==jd['outer_jd_id']
first_join = order.join(jd,relation)
first_join.show(5)
print(cv.columns)
#input(1)
assert 'id' in cv.columns
cv = cv.withColumnRenamed('id','outer_resume_id')
create_hive(cv,'my_test','rcn_resume_data')
relation2 = first_join['resume_id']==cv['outer_resume_id']
print(first_join.columns)
# input('confirm first_join')
#
# print(cv.columns)
# input('confirm cv')
assert 'outer_resume_id' in cv.columns
second_join = first_join.join(cv,relation2)
second_join.show(5)
create_hive(second_join,'my_test','matching_training_data')
print('cv', cv.count())
print(time.time() - s)
#cv.toPandas().to_csv('alljd_hive.csv')
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化