Skip to content

Commit

Permalink
init kafka demo
Browse files Browse the repository at this point in the history
  • Loading branch information
孙显顺 committed Aug 2, 2017
1 parent 05152e7 commit 13d9827
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
Binary file added bigdata/kafkademo/lib/metrics-core-2.2.0.jar
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.sxshunrj.test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MessageReceive extends Thread{

private final ConsumerConnector consumer;
private String topic;

public MessageReceive() {
consumer =kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = "test";
}

private static ConsumerConfig createConsumerConfig() {

Properties props = new Properties();
props.put("zookeeper.connect", "192.168.10.14:2181");

//group 代表一个消费组
props.put("group.id", "group1");

//zk连接超时
props.put("zookeeper.session.timeout.ms", "100");
props.put("zookeeper.sync.time.ms", "100");
props.put("auto.commit.interval.ms", "100");
props.put("auto.offset.reset", "none");
//序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");

//解决重试次数
props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "10");

return new ConsumerConfig(props);
}

public void run(){
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]> stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it = stream.iterator();

System.out.println("****************************Results****************************");
while(true){
if(it.hasNext()){
String msgContent= new String(it.next().message());
System.out.println("Receive Message Data:" +msgContent);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}
65 changes: 65 additions & 0 deletions bigdata/kafkademo/src/main/java/com/sxshunrj/test/MessageSend.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.sxshunrj.test;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageSend {
private static Logger logger = LoggerFactory.getLogger(MessageSend.class);

private final Producer<String, String> producer;

private String topic;

public MessageSend()
{
Properties props = new Properties();

//kafka消息主题
topic="test3";

//此处配置的是kafka的端口
props.put("metadata.broker.list", "namenode1:9092");

//配置value的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
//配置key的序列化类
props.put("key.serializer.class", "kafka.serializer.StringEncoder");

//request.required.acks
//0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
//1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
//-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
props.put("request.required.acks","0");

producer = new Producer<String, String>(new ProducerConfig(props));
}

public boolean sendMessage(String msgBody)
{
boolean bRet=false;
try {
producer.send(new KeyedMessage<String, String>(topic, msgBody));
bRet=true;
} catch (Exception e) {
e.printStackTrace();
}
return bRet;
}

public void stop() {
if(producer!=null){
producer.close();
}
}

public static void main(String[] args) {
MessageSend messageSend = new MessageSend();
messageSend.sendMessage("{'amount':63.50375137330458,'category':'TOY','order_time':1477415932581,'device':'Other','qty':4,'user':{'id':'bf249f36-f593-4307-b156-240b3094a1c3','age':21,'gender':'Male'},'currency':'USD','country':'CHINA'}");
}
}

0 comments on commit 13d9827

Please sign in to comment.