Como usar elasticsearch.helpers.streaming_bulk

Alguém pode aconselhar como usar a função elasticsearch.helpers.streaming_bulk, em vez elasticsearch.helpers.bulk para indexar dados no elasticsearch.

Se eu simplesmente mudar streaming_bulk em vez de em massa, nada será indexado, então acho que precisa ser usado de forma diferente.

O código abaixo cria dados de índice, tipo e índice do arquivo CSV em pedaços de 500 elemens na pesquisa elástica. Está funcionando corretamente, mas estou vagando, é possível aumentar o desempenho. É por isso que quero experimentar a função streaming_bulk.

Atualmente, preciso de 10 minutos para indexar 1 milhão de linhas para um documento CSV de 200 MB. Eu uso duas máquinas, Centos 6.6 com 8 CPU-s, x86_64, CPU MHz: 2499.902, Mem: 15.574G total. Não tenho certeza se pode ir mais rápido.

es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file

es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)

with open(file_to_index, 'rb') as csvfile:
    reader = csv.reader(csvfile)        #read documents for indexing from CSV file, more than million rows
    content = {"_index": index_name, "_type": type_name}
    batch_chunks = []
    iterator = 0

    for row in reader:
        var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
        id_increment = id_increment + 1
        #var = transform_row_for_indexing(row,fields, index_name, type_name)
        batch_chunks.append(var)
        if iterator % 500 == 0:
            helpers.bulk(es,batch_chunks)
            del batch_chunks[:]
            print "ispucalo batch"
        iterator = iterator + 1
    # indexing of last batch_chunk
    if len(batch_chunks) != 0:
        helpers.bulk(es,batch_chunks)

questionAnswers(2)

yourAnswerToTheQuestion