Cómo usar elasticsearch.helpers.streaming_bulk
¿Alguien puede aconsejar cómo usar la función elasticsearch.helpers.streaming_bulk en lugar de elasticsearch.helpers.bulk para indexar datos en elasticsearch.
Si simplemente cambio streaming_bulk en lugar de bulk, nada se indexa, por lo que supongo que debe usarse de forma diferente.
El código a continuación crea datos de índice, tipo e índice del archivo CSV en fragmentos de 500 elementos en elasticsearch. Está funcionando correctamente, pero estoy divagando si es posible aumentar el rendimiento previo. Es por eso que quiero probar la función streaming_bulk.
Actualmente necesito 10 minutos para indexar 1 millón de filas para documentos CSV de 200 MB. Yo uso dos máquinas, Centos 6.6 con 8 CPU-s, x86_64, CPU MHz: 2499.902, Mem: 15.574G en total. No estoy seguro de si puede ir más rápido.
es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file
es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)
with open(file_to_index, 'rb') as csvfile:
reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows
content = {"_index": index_name, "_type": type_name}
batch_chunks = []
iterator = 0
for row in reader:
var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
id_increment = id_increment + 1
#var = transform_row_for_indexing(row,fields, index_name, type_name)
batch_chunks.append(var)
if iterator % 500 == 0:
helpers.bulk(es,batch_chunks)
del batch_chunks[:]
print "ispucalo batch"
iterator = iterator + 1
# indexing of last batch_chunk
if len(batch_chunks) != 0:
helpers.bulk(es,batch_chunks)