Cómo realizar una instrucción Switch con Apache Spark Dataframes (Python)

Estoy tratando de realizar una operación en mis datos donde un cierto valor se asignará a una lista de valores predeterminados si coincide con uno de los criterios, o con un valor de caída de lo contrario.

Este sería el SQL equivalente:

CASE
            WHEN user_agent LIKE \'%CanvasAPI%\' THEN \'api\'
            WHEN user_agent LIKE \'%candroid%\' THEN \'mobile_app_android\'
            WHEN user_agent LIKE \'%iCanvas%\' THEN \'mobile_app_ios\'
            WHEN user_agent LIKE \'%CanvasKit%\' THEN \'mobile_app_ios\'
            WHEN user_agent LIKE \'%Windows NT%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%MacBook%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%iPhone%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iPod Touch%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iPad%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iOS%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%CrOS%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Android%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Mac OS%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Macintosh%\' THEN \'desktop\'
            ELSE \'other_unknown\'
            END AS user_agent_type

Soy bastante nuevo en Spark, por lo que mi primer intento en este programa utiliza un diccionario de búsqueda y ajusta los valores línea por línea en unRDD al igual que:

USER_AGENT_VALS = {
    'CanvasAPI': 'api',
    'candroid': 'mobile_app_android',
    'iCanvas': 'mobile_app_ios',
    'CanvasKit': 'mobile_app_ios',
    'Windows NT': 'desktop',
    'MacBook': 'desktop',
    'iPhone': 'mobile',
    'iPod Touch': 'mobile',
    'iPad': 'mobile',
    'iOS': 'mobile',
    'CrOS': 'desktop',
    'Android': 'mobile',
    'Linux': 'desktop',
    'Mac OS': 'desktop',
    'Macintosh': 'desktop'
}

def parse_requests(line: list,
                   id_data: dict,
                   user_vals: dict = USER_AGENT_VALS):
    """
    Expects an input list which maps to the following indexes:
        0: user_id
        1: context(course)_id
        2: request_month
        3: user_agent_type

    :param line: A list of values.
    :return: A list
    """
    found = False
    for key, value in user_vals.items():
        if key in line[3]:
            found = True
            line[3] = value
    if not found:
        line[3] = 'other_unknown'
    # Retrieves the session_id count from the id_data dictionary using
    # the user_id as the key.
    session_count = id_data[str(line[0])]
    line.append(session_count)
    line.extend(config3.ETL_LIST)
    return [str(item) for item in line]

Mi código actual tiene los datos en undataframe, y no estoy exactamente seguro de cómo realizar la operación anterior de la manera más eficiente. Sé que son inmutables, por lo que debe devolverse como un nuevo marco de datos, pero mi pregunta es cuál es la mejor manera de hacerlo. Aquí está mi código:

from boto3 import client
import psycopg2 as ppg2
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import current_date, date_format, lit, StringType

EMR_CLIENT = client('emr')
conf = SparkConf().setAppName('Canvas Requests Logs')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
# for dependencies
# sc.addPyFile()

USER_AGENT_VALS = {
    'CanvasAPI': 'api',
    'candroid': 'mobile_app_android',
    'iCanvas': 'mobile_app_ios',
    'CanvasKit': 'mobile_app_ios',
    'Windows NT': 'desktop',
    'MacBook': 'desktop',
    'iPhone': 'mobile',
    'iPod Touch': 'mobile',
    'iPad': 'mobile',
    'iOS': 'mobile',
    'CrOS': 'desktop',
    'Android': 'mobile',
    'Linux': 'desktop',
    'Mac OS': 'desktop',
    'Macintosh': 'desktop'
}

if __name__ == '__main__':
    df = sql_context.read.parquet(
        r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/'
        r'usage_data.gz.parquet')

    course_data = df.filter(df['context_type'] == 'Course')
    request_data = df.select(
        df['user_id'],
        df['context_id'].alias('course_id'),
        date_format(df['request_timestamp'], 'MM').alias('request_month'),
        df['user_agent']
    )

    sesh_id_data = df.groupBy('user_id').count()

    joined_data = request_data.join(
        sesh_id_data,
        on=request_data['user_id'] == sesh_id_data['user_id']
    ).drop(sesh_id_data['user_id'])

    all_fields = joined_data.withColumn(
        'etl_requests_usage', lit('DEV')
    ).withColumn(
        'etl_datetime_local', current_date()
    ).withColumn(
        'etl_transformation_name', lit('agg_canvas_logs_user_agent_types')
    ).withColumn(
        'etl_pdi_version', lit(r'Apache Spark')
    ).withColumn(
        'etl_pdi_build_version', lit(r'1.6.1')
    ).withColumn(
        'etl_pdi_hostname', lit(r'N/A')
    ).withColumn(
        'etl_pdi_ipaddress', lit(r'N/A')
    ).withColumn(
        'etl_checksum_md5', lit(r'N/A')
    )

Como PS, ¿hay una mejor manera de agregar columnas que la forma en que lo hice?

Respuestas a la pregunta(1)

Su respuesta a la pregunta