Não foi possível carregar corretamente os dados do avro do twitter na tabela de seção
Precisa da sua ajuda!
Estou tentando um exercício trivial de obter os dados do twitter e carregá-los no Hive para análise. Embora eu seja capaz de obter dados no HDFS usando o flume (usando o Twitter 1% firehose Source) e também posso carregar os dados na tabela Hive.
Mas incapaz de ver todas as colunas que eu esperava estar nos dados do twitter, como local_do_usuário, descrição_do_usuário, número_de_usuário_de_usuário, descrição_de_usuário, descrição_do_usuário. O esquema derivado do Avro contém apenas duas colunas cabeçalho e corpo.
Abaixo estão as etapas que eu executei:
1) crie um agente flume com o conf abaixo:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =org.apache.flume.source.twitter.TwitterSource
#a1.sources.r1.type = com.cloudera.flume.source.TwitterSource
a1.sources.r1.consumerKey =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.consumerSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.accessToken =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.accessTokenSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.keywords = bigdata, healthcare, oozie
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.inUsePrefix = _
a1.sinks.k1.hdfs.fileSuffix = .avro
# added for invalid block size error
a1.sinks.k1.serializer = avro_event
#a1.sinks.k1.deserializer.schemaType = LITERAL
# added for exception java.io.IOException:org.apache.avro.AvroTypeException: Found Event, expecting Doc
#a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.rollSize = 67108864
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 30
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) Derivar o esquema do arquivo de dados avro; não faço ideia do porquê do esquema derivado do arquivo de dados avro ter apenas duas colunas cabeçalho e corpo:
java -jar avro-tools-1.7.7.jar getschema FlumeData.14315982 30978.avro
{
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "headers",
"type" : {
"type" : "map",
"values" : "string"
}
}, {
"name" : "body",
"type" : "bytes"
} ]
}
3) Execute o agente acima e obtenha os dados no HDFS, descubra o esquema dos dados do avro e crie uma tabela do Hive como:
CREATE EXTERNAL TABLE TwitterData
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.literal'='
{
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "headers",
"type" : {
"type" : "map",
"values" : "string"
}
}, {
"name" : "body",
"type" : "bytes"
} ]
}
')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter'
;
4) Descreva a Tabela do Hive:
hive> describe twitterdata;
OK
headers map<string,string> from deserializer
body binary from deserializer
Time taken: 0.472 seconds, Fetched: 2 row(s)
5) Consultar a tabela: Ao consultar a tabela, vejo os dados binários na coluna 'body' e as informações reais do esquema na coluna 'header'.
select * from twitterdata limit 1;
OK
{"type":"record","name":"Doc","doc":"adoc","fields":[{"name":"id","type":"string"},{"name":"user_friends_count","type":["int","null"]},{"name":"user_location","type":["string","null"]},{"name":"user_description","type":["string","null"]},{"name":"user_statuses_count","type":["int","null"]},{"name":"user_followers_count","type":["int","null"]},{"name":"user_name","type":["string","null"]},{"name":"user_screen_name","type":["string","null"]},{"name":"created_at","type":["string","null"]},{"name":"text","type":["string","null"]},{"name":"retweet_count","type":["long","null"]},{"name":"retweeted","type":["boolean","null"]},{"name":"in_reply_to_user_id","type":["long","null"]},{"name":"source","type":["string","null"]},{"name":"in_reply_to_status_id","type":["long","null"]},{"name":"media_url_https","type":["string","null"]},{"name":"expanded_url","type":["string","null"]}]}�1|$���)]'��G�$598792495703543808�Bあいたぁぁぁぁぁぁぁ!�~�ゆっけ0725Yukken(2015-05-14T10:10:30Z<ん?なんか意味違うわ�<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>�1|$���)]'��
Time taken: 2.24 seconds, Fetched: 1 row(s)
Como criar uma tabela de seção com todas as colunas no esquema real, como mostrado na coluna 'cabeçalho'. Quero dizer, com todas as colunas como local_do_usuário, descrição_de_usuário, conta_de_usuário_amigos, descrição_de_usuário, descrição_estado_de_usuário?
O esquema derivado do arquivo de dados avro não deve conter mais colunas?
Existe algum problema com a fonte flume-avro que usei no agente flume (org.apache.flume.source.twitter.TwitterSource)?
Obrigado pela leitura.
Obrigado Farrukh, cometi que o erro foi a configuração 'a1.sinks.k1.serializer = avro_event', mudei para 'a1.sinks.k1.serializer = texto' e consegui carregar os dados no Hive . Mas agora que o problema está recuperando os dados do Hive, estou recebendo o erro abaixo ao fazer isso:
hive> describe twitterdata_09062015;
OK
id string from deserializer
user_friends_count int from deserializer
user_location string from deserializer
user_description string from deserializer
user_statuses_count int from deserializer
user_followers_count int from deserializer
user_name string from deserializer
user_screen_name string from deserializer
created_at string from deserializer
text string from deserializer
retweet_count bigint from deserializer
retweeted boolean from deserializer
in_reply_to_user_id bigint from deserializer
source string from deserializer
in_reply_to_status_id bigint from deserializer
media_url_https string from deserializer
expanded_url string from deserializer
select count(1) as num_rows from TwitterData_09062015;
Query ID = root_20150609130404_10ef21db-705a-4e94-92b7-eaa58226ee2e
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1433857038961_0003, Tracking URL = http://sandbox.hortonworks.com:8088/proxy/application_14338570 38961_0003/
Kill Command = /usr/hdp/2.2.0.0-2041/hadoop/bin/hadoop job -kill job_1433857038961_0003
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
* 13:04:36,856 Stage-1 map = 0%, reduce = 0%
* 13:05:09,576 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1433857038961_0003 with errors
Error during job, obtaining debugging information...
Examining task ID: task_1433857038961_0003_m_000000 (and more) from job job_1433857038961_0003
Task with the most failures(4):
Task ID:
task_1433857038961_0003_m_000000
URL:
http://sandbox.hortonworks.com:8088/taskdetails.jsp?jobid=job_1433857038961_0003&tipid=task_1433857038961_0003_m_0 00000
Diagnostic Messages for this Task:
Error: java.io.IOException: java.io.IOException: org.apache.avro.AvroRuntimeException: java.io.IOException: Block si ze invalid or too large for this implementation: -40
at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHand lerChain.java:121)