Erro ao usar a tabela INSERT INTO ON KEY DUPLICATE, usando uma matriz de loop for

Estou trabalhando na atualização de um banco de dados mysql usando a estrutura pyspark e executando nos serviços do AWS Glue.

Eu tenho um quadro de dados da seguinte maneira:

df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code","territory_code","territory_name","state"])

# Print out information about this data
df2.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
|    xxx1|         81A01|  TERR NAME 55|   NY|
|    xxx2|         81A01|  TERR NAME 55|   NY|
|    x103|         81A01|  TERR NAME 01|   NJ|
+---------------------------------------------

Eu tenho uma chave primária ZIP_CODE e preciso garantir que não haja chaves duplicadas ou exceções de chave primária e, portanto, estou usando INSERT INTO .... ON DUPLICATE KEYS.

E como tenho mais de uma linha para inserir / atualizar, usei o array em python para percorrer os registros e executar INSERT no banco de dados. O código é o seguinte:

sarry = df2.collect()
for r in sarry:
     db = MySQLdb.connect("xxxx.rds.amazonaws.com", "username", "password", 
      "databasename")
     cursor = db.cursor()
     insertQry=INSERT INTO ZIP_TERR(zip_code, territory_code, territory_name, 
     state) VALUES(r.zip_code, r.territory_code, r.territory_name, r.state) ON 
     DUPLICATE KEY UPDATE territory_name = VALUES(territory_name), state = 
     VALUES(state);"
     n=cursor.execute(insertQry)
     db.commit()
     db.close()

Ao executar a função de consulta de inserção acima, estou recebendo a seguinte mensagem de erro, não obtive nenhuma pista sobre o erro. Por favor ajude.

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-2291407229037300959.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-2291407229037300959.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 8, in <module>
  File "/usr/local/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/usr/local/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 893, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 1103, in _read_query_result
    result.read()
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 1396, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 1059, in _read_packet
    packet.check_error()
  File "/usr/local/lib/python2.7/site-packages/pymysql/connections.py", line 384, in check_error
    err.raise_mysql_exception(self._data)
  File "/usr/local/lib/python2.7/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
    raise errorclass(errno, errval)
InternalError: (1054, u"Unknown column 'r.zip_code' in 'field list'")

Se eu simplesmente tentar imprimir os valores para uma linha, estou obtendo os valores impressos da seguinte maneira:

print('zip_code_new: ', r.zip_code, r.territory_code, r.territory_name, r.state)

zip_code_new:  xxx1 81A01 TERR NAME 55 NY

Obrigado. Como estou trabalhando no AWS Glue / Pyspark, preciso usar as bibliotecas python nativas.

questionAnswers(1)

yourAnswerToTheQuestion