标签: sparkIMF
比较完善的流处理系统会有三个方面:
- 在线的,有非常低的延迟来处理数据,而且是稳定可靠的。
- 它能够对流进来的数据进行非常复杂的分析,而不是简单的分析。
- 不仅能处理当前在线的数据,也能处理过去一天、一周、一个月甚至一年的数据。
##配置Kafka集群
###一:安装好zookeeper
配置环境变量:vim ~/bashrc 或者编辑/etc/profile文件
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.8
export PATH=$PATH:$ZOOKEEPER_HOME/bin
使设置生效:source ~/baserc
###三:安装Kafka集群
- 找到slf4j-1.7.21.zip文件中的slf4j-nop-1.7.21.jar,目的是让我们的Kafka在后台运行,所谓在后台运行就是在命令前端没有交互,对于我们的操作会很方便
- 把kafka_2.10-0.10.0.0文件夹放入linux的目录/usr/local目录下
- 配置环境变量vim ~/bashrc
export KAFKA_HOME=/usr/local/kafka_2.10-0.10.0.0
export PATH=$PATH:$KAFKA_HOME/bin
使设置生效:source ~/baserc
-
编辑config/server.properties文件 修改zookeeper.connect
zookeeper.connect=Master:2181,Worker3:2181,Worker4:2181
-
把Kafka目录分发到Worker3和Worker4
# scp -r /usr/local/kafka_2.10-0.10.0.0 root@Worker3:/usr/local/kafka_2.10-0.10.0.0/
# scp -r /usr/local/kafka_2.10-0.10.0.0 root@Worker4:/usr/local/kafka_2.10-0.10.0.0/
- 编辑config/server.properties文件 broker.id属性默认为0 把Worker3和Worker4分别设置为1和2
##集群启动Kafka
-
先启动zookeeper
# zkServer start
-
启动Kafka Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
-
启动Kafka
nohup bin/kafka-server-start.sh config/server.properties &
-
停止Kafka
bin/kafka-server-stop.sh
-
在Worker3和Worker4中也启动Kafka
ssh Worker3 cd /usr/local/kafka_2.10-0.10.0.0/ bin/kafka-server-start.sh config/server.properties & ssh Worker4 cd /usr/local/kafka_2.10-0.10.0.0/ bin/kafka-server-start.sh config/server.properties &
-
测试集群是否正常工作 Create a topic
# bin/kafka-topics.sh --create --zookeeper Master:2181,Worker3:2181,Worker4:2181 --replication-factor 1 --partitions 1 --topic HelloKafka # bin/kafka-topics.sh --list --zookeeper localhost:2181
Send some messages
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloKafka bin/kafka-console-producer.sh --broker-list Master:9092,Worker3:9092,Worker4:9092 --topic HelloKafka This is Spark! I'm Rocky!
Start a consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic HelloKafka --from-beginning bin/kafka-console-consumer.sh --zookeeper Master:2181,Worker3:2181,Worker4:2181 --topic HelloKafka --from-beginning
##单机模式运行Kafka(Windows系统)
- 由于Kafka需要Zookeeper,首先启动Zookeeper
zookeeper-server-start G:/runtime/kafka_2.11-0.10.0.0/config/zookeeper.properties
- 启动Kafka
kafka-server-start G:/runtime/kafka_2.11-0.10.0.0/config/server.properties
- Create a topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HelloKafka
kafka-topics --list --zookeeper localhost:2181
- Send some messages
kafka-console-producer --broker-list localhost:9092 --topic HelloKafka
输入内容:
Hello Spark
Hello Scala
Hello Java
Hello Hadoop
- Start a consumer
kafka-console-consumer --zookeeper localhost:2181 --topic HelloKafka --from-beginning
观察结果:
Hello Spark
Hello Scala
Hello Java
Hello Hadoop