消息队列
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性的架构,是大型分布式系统中不可缺少的中间件
使用场景
异步处理
串行方式:用户注册->写入数据库->发送注册邮件->发送注册短信
并行方式:用户注册->写入数据库->发送邮件并且发送短信
消息队列:用户注册->写入数据库->写入消息队列->发送邮件和短信的消费者异步读取消息队列,写入消息队列即将结果返回给客户端
流量大的时候redis并不可靠
应用解耦
用户下单后,订单系统需要通知库存系统
传统方式:订单系统调用库存系统的接口
消息队列:
订单系统:当用户下单后,将订单内容写入消息队列,返回订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式获取下单的信息,库存系统根据下单的信息,进行库存操作
流量削锋
秒杀活动,因为流量过大,导致流量暴增
传统方式:突然接受来自前端大量订单请求
消息队列:在应用前端加入消息队列
用户请求,服务器接收后首先写入消息队列,超过最大数量,直接抛弃用户请求或者跳转到错误页面,
秒杀业务根据消息队列的信息进行相关处理
日志处理
解决大量日志传输的问题
日志采集客户端负责日志数据采集写入Kafka
Kafka消息队列负责日志数据的接受、存储和转发
日志处理应用:订阅并消费Kafka队列中的日志数据
生产环境中,使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka、RocketMQ
Kafka简介
Kafka是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者规模较大的网站中的所有动作流数据
kafka.apache.org
优势:
在普通的硬件上也支持每秒数百万的消息
支持Kafka服务器和消费机集群来区分消息
关键概念
Broker:kafka集群中的一台或者多台服务器统称为broker
Topic:kafka处理的消息源的不同的分类
Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,partition中的每个消息都会被分配一个有序的id
Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息
Producers:消息和数据生产者,向kafka的一个topic发布消息的过程叫做producers
Consumers:消息和数据消费者,订阅topic并处理其发布的消息的过程叫做consumers
使用场景
安装
安装JAVA环境安装JDK
:http://www.oracle.com/technetwork/java/javase/downloads/jdk9-downloads-3848520.html
1 | mkdir /usr/jdk |
source /etc/profile
安装kafka1
2
3
4
5
6
7
8#启动zookeeper server
bin/zookeeper-server-start.sh config/zookeeper.properties &
#启动kafka
bin/kafka-server-start.sh config/server.properties &
#producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zhyunfe --from-beginning
结合PHP使用
安装php扩展
需要打一个库librdkafka:https://github.com/edenhill/librdkafka.git1
2
3
4
5
6git clone https://github.com/edenhill/librdkafka.git
tar zxvf librdkafka
cd librdkafka
./configure
make && make install
php-rdkafka:https://github.com/arnaud-lb/php-rdkafka1
2
3
4
5
6
7
8
9
10
11git clone https://github.com/arnaud-lb/php-rdkafka
tar -zxvf php-rdkafka
cd php-rdkafka
phpize
./configure --with-php-confif=/usr/local/Cellar/php/lib/php-config
make && make install
#编辑php.ini
vim /usr/local/etc/php/7.0/php.ini
extension=rdkafka.so
重启php
使用kafka1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43<?php
/**
* Created by PhpStorm.
* User: zhyunfe
* Date: 2017/12/10
* Time: 下午9:15
*/
namespace app\index\model;
use think\image\Exception;
class Kafka
{
public $broker_list = '192.168.37.140:9092';
public $topic = "topic";
public $partition = 0;
public $logFile = '';
protected $producer = null;
protected $consumer = null;
public function __construct()
{
if (empty($this->broker_list)) {
throw new Exception('broker list not init');
}
$rk = new \RdKafka\Producer();
if (empty($rk)) {
throw new Exception('rdkafka not iniit');
}
$rk->setLogLevel(LOG_DEBUG);
if (!$rk->addBrokers($this->broker_list)) {
throw new Exception('broker_list not init');
}
$this->producer = $rk;
}
public function send($message = [])
{
$topic = $this->producer->newTopic($this->topic);
var_dump($topic);die();
return $topic->produce(RD_KAFKA_PARTITION_UA,$this->partition,json_encode($message));
}
}