Как выполнить оператор Switch с Apache Spark Dataframes (Python)

Я пытаюсь выполнить операцию с моими данными, когда определенное значение будет отображено в список предварительно определенных значений, если оно соответствует одному из критериев, или в противном случае - к значению сброса.

Это будет эквивалентный SQL:

CASE
            WHEN user_agent LIKE \'%CanvasAPI%\' THEN \'api\'
            WHEN user_agent LIKE \'%candroid%\' THEN \'mobile_app_android\'
            WHEN user_agent LIKE \'%iCanvas%\' THEN \'mobile_app_ios\'
            WHEN user_agent LIKE \'%CanvasKit%\' THEN \'mobile_app_ios\'
            WHEN user_agent LIKE \'%Windows NT%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%MacBook%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%iPhone%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iPod Touch%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iPad%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iOS%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%CrOS%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Android%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Mac OS%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Macintosh%\' THEN \'desktop\'
            ELSE \'other_unknown\'
            END AS user_agent_type

Я довольно новичок в Spark, и поэтому моя первая попытка использования этой программы использует словарь поиска и корректирует значения построчно вRDD вот так:

USER_AGENT_VALS = {
    'CanvasAPI': 'api',
    'candroid': 'mobile_app_android',
    'iCanvas': 'mobile_app_ios',
    'CanvasKit': 'mobile_app_ios',
    'Windows NT': 'desktop',
    'MacBook': 'desktop',
    'iPhone': 'mobile',
    'iPod Touch': 'mobile',
    'iPad': 'mobile',
    'iOS': 'mobile',
    'CrOS': 'desktop',
    'Android': 'mobile',
    'Linux': 'desktop',
    'Mac OS': 'desktop',
    'Macintosh': 'desktop'
}

def parse_requests(line: list,
                   id_data: dict,
                   user_vals: dict = USER_AGENT_VALS):
    """
    Expects an input list which maps to the following indexes:
        0: user_id
        1: context(course)_id
        2: request_month
        3: user_agent_type

    :param line: A list of values.
    :return: A list
    """
    found = False
    for key, value in user_vals.items():
        if key in line[3]:
            found = True
            line[3] = value
    if not found:
        line[3] = 'other_unknown'
    # Retrieves the session_id count from the id_data dictionary using
    # the user_id as the key.
    session_count = id_data[str(line[0])]
    line.append(session_count)
    line.extend(config3.ETL_LIST)
    return [str(item) for item in line]

Мой текущий код содержит данные вdataframe, и я не совсем уверен, как выполнить вышеуказанную операцию наиболее эффективно. Я знаю, что они являются неизменяемыми, поэтому их нужно возвращать как новый фрейм данных, но мой вопрос заключается в том, как лучше всего это сделать. Вот мой код:

from boto3 import client
import psycopg2 as ppg2
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import current_date, date_format, lit, StringType

EMR_CLIENT = client('emr')
conf = SparkConf().setAppName('Canvas Requests Logs')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
# for dependencies
# sc.addPyFile()

USER_AGENT_VALS = {
    'CanvasAPI': 'api',
    'candroid': 'mobile_app_android',
    'iCanvas': 'mobile_app_ios',
    'CanvasKit': 'mobile_app_ios',
    'Windows NT': 'desktop',
    'MacBook': 'desktop',
    'iPhone': 'mobile',
    'iPod Touch': 'mobile',
    'iPad': 'mobile',
    'iOS': 'mobile',
    'CrOS': 'desktop',
    'Android': 'mobile',
    'Linux': 'desktop',
    'Mac OS': 'desktop',
    'Macintosh': 'desktop'
}

if __name__ == '__main__':
    df = sql_context.read.parquet(
        r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/'
        r'usage_data.gz.parquet')

    course_data = df.filter(df['context_type'] == 'Course')
    request_data = df.select(
        df['user_id'],
        df['context_id'].alias('course_id'),
        date_format(df['request_timestamp'], 'MM').alias('request_month'),
        df['user_agent']
    )

    sesh_id_data = df.groupBy('user_id').count()

    joined_data = request_data.join(
        sesh_id_data,
        on=request_data['user_id'] == sesh_id_data['user_id']
    ).drop(sesh_id_data['user_id'])

    all_fields = joined_data.withColumn(
        'etl_requests_usage', lit('DEV')
    ).withColumn(
        'etl_datetime_local', current_date()
    ).withColumn(
        'etl_transformation_name', lit('agg_canvas_logs_user_agent_types')
    ).withColumn(
        'etl_pdi_version', lit(r'Apache Spark')
    ).withColumn(
        'etl_pdi_build_version', lit(r'1.6.1')
    ).withColumn(
        'etl_pdi_hostname', lit(r'N/A')
    ).withColumn(
        'etl_pdi_ipaddress', lit(r'N/A')
    ).withColumn(
        'etl_checksum_md5', lit(r'N/A')
    )

Как PS, есть ли лучший способ добавить столбцы, чем я это сделал?