Python Cassandra Treiber gleiche Insert-Performance wie copy
Ich versuche, Python async mit Cassandra zu verwenden, um festzustellen, ob ich Datensätze schneller als mit dem Befehl CQL COPY in Cassandra schreiben kann.
Mein Python-Code sieht so aus:
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])
session = cluster.connect('test')
with open('dataImport.txt') as f:
for line in f:
query = SimpleStatement (
"INSERT INTO tstTable (id, accts, info) VALUES (%s) " %(line),
consistency_level=ConsistencyLevel.ONE)
session.execute_async (query)
Aber es bietet mir die gleiche Leistung wie der Befehl COPY ... ungefähr 2.700 Zeilen / Sek. ... sollte es mit async schneller sein?
Muss ich Multithreading in Python verwenden? Ich lese nur darüber, bin mir aber nicht sicher, wie es dazu passt ...
BEARBEITEN
so habe ich etwas online gefunden, das ich zu ändern versuche, aber nicht ganz zum Laufen bringen kann ... Ich habe dies bisher..auch ich habe die Datei in 3 Dateien aufgeteilt in / Data / toImport / dir:
import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])
session = cluster.connect('test')
def mp_worker(inputArg):
with open(inputArg[0]) as f:
for line in f:
query = SimpleStatement (
"INSERT INTO CustInfo (cust_id, accts, offers) values (%s)" %(line),
consistency_level=ConsistencyLevel.ONE)
session.execute_async (query)
def mp_handler(inputData, nThreads = 8):
p = multiprocessing.Pool(nThreads)
p.map(mp_worker, inputData, chunksize=1)
p.close()
p.join()
if __name__ == '__main__':
temp_in_data = file_list
start = time.time()
in_dir = '/Data/toImport/'
N_Proc = 8
file_data = [(in_dir) for i in temp_in_data]
print '----------------------------------Start Working!!!!-----------------------------'
print 'Number of Processes using: %d' %N_Proc
mp_handler(file_data, N_Proc)
end = time.time()
time_elapsed = end - start
print '----------------------------------All Done!!!!-----------------------------'
print "Time elapsed: {} seconds".format(time_elapsed)
aber diesen Fehler bekommen:
Traceback (most recent call last):
File "multiCass.py", line 27, in <module>
temp_in_data = file_list
NameError: name 'file_list' is not defined