Python Cassandra Treiber gleiche Insert-Performance wie copy

Ich versuche, Python async mit Cassandra zu verwenden, um festzustellen, ob ich Datensätze schneller als mit dem Befehl CQL COPY in Cassandra schreiben kann.

Mein Python-Code sieht so aus:

from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])

session = cluster.connect('test')

with open('dataImport.txt') as f:
    for line in f:
        query = SimpleStatement (
            "INSERT INTO tstTable (id, accts, info) VALUES (%s) " %(line),
            consistency_level=ConsistencyLevel.ONE)
        session.execute_async (query)

Aber es bietet mir die gleiche Leistung wie der Befehl COPY ... ungefähr 2.700 Zeilen / Sek. ... sollte es mit async schneller sein?

Muss ich Multithreading in Python verwenden? Ich lese nur darüber, bin mir aber nicht sicher, wie es dazu passt ...

BEARBEITEN

so habe ich etwas online gefunden, das ich zu ändern versuche, aber nicht ganz zum Laufen bringen kann ... Ich habe dies bisher..auch ich habe die Datei in 3 Dateien aufgeteilt in / Data / toImport / dir:

import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement


cluster = Cluster(['1.2.1.4'])

session = cluster.connect('test')

def mp_worker(inputArg):
        with open(inputArg[0]) as f:
            for line in f:
                query = SimpleStatement (
                    "INSERT INTO CustInfo (cust_id, accts, offers) values (%s)" %(line),
                    consistency_level=ConsistencyLevel.ONE)
                session.execute_async (query)


def mp_handler(inputData, nThreads = 8):
    p = multiprocessing.Pool(nThreads)
    p.map(mp_worker, inputData, chunksize=1)
    p.close()
    p.join()

if __name__ == '__main__':
    temp_in_data = file_list
    start = time.time()
    in_dir = '/Data/toImport/'
    N_Proc = 8
    file_data = [(in_dir) for i in temp_in_data]

    print '----------------------------------Start Working!!!!-----------------------------'
    print 'Number of Processes using: %d' %N_Proc
    mp_handler(file_data, N_Proc)
    end = time.time()
    time_elapsed = end - start
    print '----------------------------------All Done!!!!-----------------------------'
    print "Time elapsed: {} seconds".format(time_elapsed)

aber diesen Fehler bekommen:

Traceback (most recent call last):
  File "multiCass.py", line 27, in <module>
    temp_in_data = file_list
NameError: name 'file_list' is not defined

Antworten auf die Frage(4)

Ihre Antwort auf die Frage