Wie führe ich eine Switch-Anweisung mit Apache Spark Dataframes (Python) durch?
Ich versuche, eine Operation für meine Daten auszuführen, bei der ein bestimmter Wert einer Liste vordefinierter Werte zugeordnet wird, wenn er einem der Kriterien entspricht, oder ansonsten einem Fall-Through-Wert.
Dies wäre das Äquivalent zu SQL:
CASE
WHEN user_agent LIKE \'%CanvasAPI%\' THEN \'api\'
WHEN user_agent LIKE \'%candroid%\' THEN \'mobile_app_android\'
WHEN user_agent LIKE \'%iCanvas%\' THEN \'mobile_app_ios\'
WHEN user_agent LIKE \'%CanvasKit%\' THEN \'mobile_app_ios\'
WHEN user_agent LIKE \'%Windows NT%\' THEN \'desktop\'
WHEN user_agent LIKE \'%MacBook%\' THEN \'desktop\'
WHEN user_agent LIKE \'%iPhone%\' THEN \'mobile\'
WHEN user_agent LIKE \'%iPod Touch%\' THEN \'mobile\'
WHEN user_agent LIKE \'%iPad%\' THEN \'mobile\'
WHEN user_agent LIKE \'%iOS%\' THEN \'mobile\'
WHEN user_agent LIKE \'%CrOS%\' THEN \'desktop\'
WHEN user_agent LIKE \'%Android%\' THEN \'mobile\'
WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\'
WHEN user_agent LIKE \'%Mac OS%\' THEN \'desktop\'
WHEN user_agent LIKE \'%Macintosh%\' THEN \'desktop\'
ELSE \'other_unknown\'
END AS user_agent_type
Ich bin ein Neuling in Spark, und so verwendet mein erster Versuch mit diesem Programm ein Nachschlagewörterbuch und passt die Werte zeilenweise in einem @ aRDD
wie so:
USER_AGENT_VALS = {
'CanvasAPI': 'api',
'candroid': 'mobile_app_android',
'iCanvas': 'mobile_app_ios',
'CanvasKit': 'mobile_app_ios',
'Windows NT': 'desktop',
'MacBook': 'desktop',
'iPhone': 'mobile',
'iPod Touch': 'mobile',
'iPad': 'mobile',
'iOS': 'mobile',
'CrOS': 'desktop',
'Android': 'mobile',
'Linux': 'desktop',
'Mac OS': 'desktop',
'Macintosh': 'desktop'
}
def parse_requests(line: list,
id_data: dict,
user_vals: dict = USER_AGENT_VALS):
"""
Expects an input list which maps to the following indexes:
0: user_id
1: context(course)_id
2: request_month
3: user_agent_type
:param line: A list of values.
:return: A list
"""
found = False
for key, value in user_vals.items():
if key in line[3]:
found = True
line[3] = value
if not found:
line[3] = 'other_unknown'
# Retrieves the session_id count from the id_data dictionary using
# the user_id as the key.
session_count = id_data[str(line[0])]
line.append(session_count)
line.extend(config3.ETL_LIST)
return [str(item) for item in line]
Mein aktueller Code enthält die Daten in einemdataframe
, und ich bin mir nicht ganz sicher, wie ich den obigen Vorgang am effizientesten ausführen soll. Ich weiß, dass sie unveränderlich sind, daher muss sie als neuer Datenrahmen zurückgegeben werden, aber meine Frage ist, wie dies am besten funktioniert. Hier ist mein Code:
from boto3 import client
import psycopg2 as ppg2
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import current_date, date_format, lit, StringType
EMR_CLIENT = client('emr')
conf = SparkConf().setAppName('Canvas Requests Logs')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
# for dependencies
# sc.addPyFile()
USER_AGENT_VALS = {
'CanvasAPI': 'api',
'candroid': 'mobile_app_android',
'iCanvas': 'mobile_app_ios',
'CanvasKit': 'mobile_app_ios',
'Windows NT': 'desktop',
'MacBook': 'desktop',
'iPhone': 'mobile',
'iPod Touch': 'mobile',
'iPad': 'mobile',
'iOS': 'mobile',
'CrOS': 'desktop',
'Android': 'mobile',
'Linux': 'desktop',
'Mac OS': 'desktop',
'Macintosh': 'desktop'
}
if __name__ == '__main__':
df = sql_context.read.parquet(
r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/'
r'usage_data.gz.parquet')
course_data = df.filter(df['context_type'] == 'Course')
request_data = df.select(
df['user_id'],
df['context_id'].alias('course_id'),
date_format(df['request_timestamp'], 'MM').alias('request_month'),
df['user_agent']
)
sesh_id_data = df.groupBy('user_id').count()
joined_data = request_data.join(
sesh_id_data,
on=request_data['user_id'] == sesh_id_data['user_id']
).drop(sesh_id_data['user_id'])
all_fields = joined_data.withColumn(
'etl_requests_usage', lit('DEV')
).withColumn(
'etl_datetime_local', current_date()
).withColumn(
'etl_transformation_name', lit('agg_canvas_logs_user_agent_types')
).withColumn(
'etl_pdi_version', lit(r'Apache Spark')
).withColumn(
'etl_pdi_build_version', lit(r'1.6.1')
).withColumn(
'etl_pdi_hostname', lit(r'N/A')
).withColumn(
'etl_pdi_ipaddress', lit(r'N/A')
).withColumn(
'etl_checksum_md5', lit(r'N/A')
)
Als PS, gibt es eine bessere Möglichkeit, Spalten hinzuzufügen, als ich es getan habe?