初始Kafka

消息队列

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性的架构,是大型分布式系统中不可缺少的中间件

使用场景

异步处理

串行方式:用户注册->写入数据库->发送注册邮件->发送注册短信

并行方式:用户注册->写入数据库->发送邮件并且发送短信

消息队列:用户注册->写入数据库->写入消息队列->发送邮件和短信的消费者异步读取消息队列,写入消息队列即将结果返回给客户端

流量大的时候redis并不可靠

应用解耦
用户下单后,订单系统需要通知库存系统

传统方式:订单系统调用库存系统的接口

消息队列:

    订单系统:当用户下单后,将订单内容写入消息队列,返回订单下单成功

    库存系统:订阅下单的消息,采用拉/推的方式获取下单的信息,库存系统根据下单的信息,进行库存操作

流量削锋
秒杀活动,因为流量过大,导致流量暴增

传统方式:突然接受来自前端大量订单请求

消息队列:在应用前端加入消息队列

用户请求,服务器接收后首先写入消息队列,超过最大数量,直接抛弃用户请求或者跳转到错误页面,

秒杀业务根据消息队列的信息进行相关处理

日志处理

解决大量日志传输的问题
日志采集客户端负责日志数据采集写入Kafka

Kafka消息队列负责日志数据的接受、存储和转发

日志处理应用:订阅并消费Kafka队列中的日志数据

生产环境中,使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka、RocketMQ

Kafka简介

Kafka是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者规模较大的网站中的所有动作流数据

kafka.apache.org

优势:
在普通的硬件上也支持每秒数百万的消息

支持Kafka服务器和消费机集群来区分消息

关键概念

Broker:kafka集群中的一台或者多台服务器统称为broker

Topic:kafka处理的消息源的不同的分类

Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,partition中的每个消息都会被分配一个有序的id

Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息

Producers:消息和数据生产者,向kafka的一个topic发布消息的过程叫做producers

Consumers:消息和数据消费者,订阅topic并处理其发布的消息的过程叫做consumers

使用场景

安装

安装JAVA环境安装JDK
http://www.oracle.com/technetwork/java/javase/downloads/jdk9-downloads-3848520.html

1
2
3
4
5
6
7
8
9
mkdir /usr/jdk
cp jdk.gz /usr/jdk
tar -xzvf jdk.gz
vim /etc/profile
#添加一下内容
JAVA_HOME=/usr/jdk
CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH

source /etc/profile

安装kafka

1
2
3
4
5
6
7
8
#启动zookeeper server
bin/zookeeper-server-start.sh config/zookeeper.properties &
#启动kafka
bin/kafka-server-start.sh config/server.properties &
#producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zhyunfe --from-beginning

结合PHP使用

安装php扩展
需要打一个库librdkafka:https://github.com/edenhill/librdkafka.git

1
2
3
4
5
6
git clone https://github.com/edenhill/librdkafka.git

tar zxvf librdkafka
cd librdkafka
./configure
make && make install

php-rdkafka:https://github.com/arnaud-lb/php-rdkafka

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/arnaud-lb/php-rdkafka

tar -zxvf php-rdkafka
cd php-rdkafka
phpize
./configure --with-php-confif=/usr/local/Cellar/php/lib/php-config
make && make install
#编辑php.ini
vim /usr/local/etc/php/7.0/php.ini
extension=rdkafka.so
重启php

使用kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php
/**
* Created by PhpStorm.
* User: zhyunfe
* Date: 2017/12/10
* Time: 下午9:15
*/
namespace app\index\model;
use think\image\Exception;
class Kafka
{
public $broker_list = '192.168.37.140:9092';
public $topic = "topic";
public $partition = 0;
public $logFile = '';

protected $producer = null;
protected $consumer = null;
public function __construct()
{
if (empty($this->broker_list)) {
throw new Exception('broker list not init');
}
$rk = new \RdKafka\Producer();
if (empty($rk)) {
throw new Exception('rdkafka not iniit');
}

$rk->setLogLevel(LOG_DEBUG);
if (!$rk->addBrokers($this->broker_list)) {
throw new Exception('broker_list not init');
}

$this->producer = $rk;

}
public function send($message = [])
{
$topic = $this->producer->newTopic($this->topic);
var_dump($topic);die();
return $topic->produce(RD_KAFKA_PARTITION_UA,$this->partition,json_encode($message));
}
}