JDK8 tomcat启动时SecureRandom 非常慢解决办法

转:jdk8 tomcat启动很慢的问题

JDK8+tomcat8环境tomcat启动时SecureRandom 非常慢解决办法

 

JDK 8 + tomcat8 启动有时会出现 org.apache.catalina.util.SessionIdGeneratorBase- Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [413,255] milliseconds
耗时时间很长。
Tomcat 8启动很慢,且日志上无任何错误,在日志中查看到如下信息:
Log4j:[2015-10-29 15:47:11] INFO ReadProperty:172 – Loading properties file from class path resource [resources/jdbc.properties]
Log4j:[2015-10-29 15:47:11] INFO ReadProperty:172 – Loading properties file from class path resource [resources/common.properties]
29-Oct-2015 15:52:53.587 INFO [localhost-startStop-1] org.apache.catalina.util.SessionIdGeneratorBase.createSecureRandom Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [342,445] milliseconds.
原因

Tomcat 7/8都使用org.apache.catalina.util.SessionIdGeneratorBase.createSecureRandom类产生安全随机类SecureRandom的实例作为会话ID,这里花去了342秒,也即接近6分钟。

SHA1PRNG算法是基于SHA-1算法实现且保密性较强的伪随机数生成器。

在SHA1PRNG中,有一个种子产生器,它根据配置执行各种操作。

1)如果Java.security.egd 属性或securerandom.source属性指定的是”file:/dev/random”或”file:/dev/urandom”,那么JVM 会使用本地种子产生器NativeSeedGenerator,它会调用super()方法,即调用 SeedGenerator.URLSeedGenerator(/dev/random)方法进行初始化。

2)如果java.security.egd属性或securerandom.source属性指定的是其它已存在的URL,那么会调用SeedGenerator.URLSeedGenerator(url)方法进行初始化。

这就是为什么我们设置值为”file:///dev/urandom”或者值为”file:/./dev/random”都会起作用的原因。

在这个实现中,产生器会评估熵池(entropy pool)中的噪声数量。随机数是从熵池中进行创建的。当读操作时,/dev/random设备会只返回熵池中噪声的随机字节。/dev/random非 常适合那些需要非常高质量随机性的场景,比如一次性的支付或生成密钥的场景。

当熵池为空时,来自/dev/random的读操作将被阻塞,直到熵池收集到足够的环境噪声数据。这么做的目的是成为一个密码安全的伪随机数发生器,熵池要有尽可能大的输出。对于生成高质量的加密密钥或者是需要长期保护的场景,一定要这么做。

那么什么是环境噪声?

随机数产生器会手机来自设备驱动器和其它源的环境噪声数据,并放入熵池中。产生器会评估熵池中的噪声数据的数量。当熵池为空时,这个噪声数据的收集是比较花时间的。这就意味着,Tomcat在生产环境中使用熵池时,会被阻塞较长的时间。

解决

有两种解决办法:

1)在Tomcat环境中解决

可以通过配置JRE使用非阻塞的Entropy Source。

在catalina.sh中加入这么一行:-Djava.security.egd=file:/dev/./urandom 即可。

加入后再启动Tomcat,整个启动耗时下降到Server startup in 2912 ms。
2)在JVM环境中解决

打开$JAVA_PATH/jre/lib/security/java.security这个文件,找到下面的内容:

securerandom.source=file:/dev/urandom
替换成

securerandom.source=file:/dev/./urandom
经测试:在JVM环境中解决比较有效。

tomcat 调优

1. 关闭AJP端口

AJP是为 Tomcat 与 HTTP 服务器之间通信而定制的协议,能提供较高的通信速度和效率。如果tomcat前端放的是apache的时候,会使用到AJP这个连接器。若tomcat未与apache配合使用,因此不使用此连接器,因此需要注销掉该连接器。

 2. 开启线程池:

conf/server/xml中配置:

 

minSpareThreads=”100″ maxQueueSize=”100″ prestartminSpareThreads=”true”
maxThreads=”800″/>

connectionTimeout=”20000″
redirectPort=”8443″ URIEncoding=”utf-8″ />

3. tomcat启动apr模式


Tomcat支持三种接收请求的处理方式:BIO、NIO、APR
1>、BIO模式:阻塞式I/O操作,表示Tomcat使用的是传统Java I/O操作(即java.io包及其子包)。
Tomcat7以下版本默认情况下是以bio模式运行的,由于每个请求都要创建一个线程来处理,线程开销较大,不能处理高并发的场景,在三种模式中性能也最低。
启动tomcat看到如下日志,表示使用的是BIO模式:
Sep 17, 2018 6:59:47 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler [“http-bio-8090”]
Sep 17, 2018 6:59:47 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler [“ajp-bio-8009”]


2>、NIO模式:是Java SE 1.4及后续版本提供的一种新的I/O操作方式(即java.nio包及其子包)。
是一个基于缓冲区、并能提供非阻塞I/O操作的Java API,它拥有比传统I/O操作(bio)更好的并发运行性能。
要让Tomcat以nio模式来运行比较简单,只需要在Tomcat安装目录/conf/server.xml文件中将如下配置:
connectionTimeout=”20000″
redirectPort=”8443″ />

修改成:
connectionTimeout=”20000″
redirectPort=”8443″ />
注意:Tomcat8以上版本,默认使用的就是NIO模式,不需要额外修改

2>、APR模式:简单理解,就是从操作系统级别解决异步IO问题,大幅度的提高服务器的处理和响应性能, 也是Tomcat运行高并发应用的首选模式。
启用这种模式稍微麻烦一些,需要安装一些依赖库,介绍安装步聚:

#install apr
wget http://mirrors.cnnic.cn/apache/apr/apr-1.6.3.tar.gz
tar zxvf apr-1.6.3.tar.gz -C /usr/local/
cd apr-1.6.3
./configure –prefix=/usr/local/apr
make & make install

#install apr-util
wget http://mirrors.cnnic.cn/apache/apr/apr-util-1.6.1.tar.gz
tar zxvf apr-util-1.6.1.tar.gz -C /usr/local/
cd apr-util-1.6.1
./configure –prefix=/usr/local/apr-util –with-apr=/usr/local/apr
make & make install

备注: 如果报错:xml/apr_xml.c:35:19: fatal error: expat.h: No such file or directory

         则说明少了expat库,yum install expat-devel安装该库

#其他插件
# yum install apr-devel
yum install openssl-devel
yum install gcc
yum install make

#安装tomcat-native
进入tomcat的bin目录,解压
tar -xvzf tomcat-native.tar.gz -C /usr/local
cd /usr/local
mv tomcat-native-1.1.29-src/ tomcat-native-1.1.29
cd /usr/local/tomcat-native-1.1.29/jni/native

./configure –with-apr=/usr/local/apr –with-java-home=/usr/local/java
make && make install

#配置apr
编辑$TOMCAT_HOME/bin/catalina.sh文件,在虚拟机启动参数JAVA_OPTS中添加java.library.path参数,指定apr库的路径
JAVA_OPTS=”$JAVA_OPTS -Djava.library.path=/usr/local/apr/lib”

Tomcat8以下版本,需要指定运行模式,
然后进入tomcat安装目录,编辑配置文件:conf/server.xml,
将protocol从HTTP/1.1改成org.apache.coyote.http11.Http11AprProtocol

connectionTimeout=”20000″
redirectPort=”8443″ />

其他参考:https://blog.csdn.net/huiyunfei/article/details/79165120

RESTful API 设计指南

一、协议
API与用户的通信协议,总是使用HTTPs协议。
二、域名
应该尽量将API部署在专用域名之下。

https://api.example.com
如果确定API很简单,不会有进一步扩展,可以考虑放在主域名下。

https://example.org/api/
三、版本(Versioning)
应该将API的版本号放入URL。

https://api.example.com/v1/
另一种做法是,将版本号放在HTTP头信息中,但不如放入URL方便和直观。Github采用这种做法。
四、路径(Endpoint)
路径又称”终点”(endpoint),表示API的具体网址。
在RESTful架构中,每个网址代表一种资源(resource),所以网址中不能有动词,只能有名词,而且所用的名词往往与数据库的表格名对应。一般来说,数据库中的表都是同种记录的”集合”(collection),所以API中的名词也应该使用复数。
举例来说,有一个API提供动物园(zoo)的信息,还包括各种动物和雇员的信息,则它的路径应该设计成下面这样。
https://api.example.com/v1/zoos
https://api.example.com/v1/animals
https://api.example.com/v1/employees
五、HTTP动词
对于资源的具体操作类型,由HTTP动词表示。
常用的HTTP动词有下面五个(括号里是对应的SQL命令)。
GET(SELECT):从服务器取出资源(一项或多项)。
POST(CREATE):在服务器新建一个资源。
PUT(UPDATE):在服务器更新资源(客户端提供改变后的完整资源)。
PATCH(UPDATE):在服务器更新资源(客户端提供改变的属性)。
DELETE(DELETE):从服务器删除资源。
还有两个不常用的HTTP动词。
HEAD:获取资源的元数据。
OPTIONS:获取信息,关于资源的哪些属性是客户端可以改变的。
下面是一些例子。
GET /zoos:列出所有动物园
POST /zoos:新建一个动物园
GET /zoos/ID:获取某个指定动物园的信息
PUT /zoos/ID:更新某个指定动物园的信息(提供该动物园的全部信息)
PATCH /zoos/ID:更新某个指定动物园的信息(提供该动物园的部分信息)
DELETE /zoos/ID:删除某个动物园
GET /zoos/ID/animals:列出某个指定动物园的所有动物
DELETE /zoos/ID/animals/ID:删除某个指定动物园的指定动物
六、过滤信息(Filtering)
如果记录数量很多,服务器不可能都将它们返回给用户。API应该提供参数,过滤返回结果。
下面是一些常见的参数。
?limit=10:指定返回记录的数量
?offset=10:指定返回记录的开始位置。
?page=2&per_page=100:指定第几页,以及每页的记录数。
?sortby=name&order=asc:指定返回结果按照哪个属性排序,以及排序顺序。
?animal_type_id=1:指定筛选条件
参数的设计允许存在冗余,即允许API路径和URL参数偶尔有重复。比如,GET /zoo/ID/animals 与 GET /animals?zoo_id=ID 的含义是相同的。
七、状态码(Status Codes)
服务器向用户返回的状态码和提示信息,常见的有以下一些(方括号中是该状态码对应的HTTP动词)。
200 OK – [GET]:服务器成功返回用户请求的数据,该操作是幂等的(Idempotent)。
201 CREATED – [POST/PUT/PATCH]:用户新建或修改数据成功。
202 Accepted – []:表示一个请求已经进入后台排队(异步任务)
204 NO CONTENT – [DELETE]:用户删除数据成功。
400 INVALID REQUEST – [POST/PUT/PATCH]:用户发出的请求有错误,服务器没有进行新建或修改数据的操作,该操作是幂等的。
401 Unauthorized – [
]:表示用户没有权限(令牌、用户名、密码错误)。
403 Forbidden – [] 表示用户得到授权(与401错误相对),但是访问是被禁止的。
404 NOT FOUND – [
]:用户发出的请求针对的是不存在的记录,服务器没有进行操作,该操作是幂等的。
406 Not Acceptable – [GET]:用户请求的格式不可得(比如用户请求JSON格式,但是只有XML格式)。
410 Gone -[GET]:用户请求的资源被永久删除,且不会再得到的。
422 Unprocesable entity – [POST/PUT/PATCH] 当创建一个对象时,发生一个验证错误。
500 INTERNAL SERVER ERROR – [*]:服务器发生错误,用户将无法判断发出的请求是否成功。
状态码的完全列表参见这里。
八、错误处理(Error handling)
如果状态码是4xx,就应该向用户返回出错信息。一般来说,返回的信息中将error作为键名,出错信息作为键值即可。

{
error: “Invalid API key”
}
九、返回结果
针对不同操作,服务器向用户返回的结果应该符合以下规范。
GET /collection:返回资源对象的列表(数组)
GET /collection/resource:返回单个资源对象
POST /collection:返回新生成的资源对象
PUT /collection/resource:返回完整的资源对象
PATCH /collection/resource:返回完整的资源对象
DELETE /collection/resource:返回一个空文档
十、Hypermedia API
RESTful API最好做到Hypermedia,即返回结果中提供链接,连向其他API方法,使得用户不查文档,也知道下一步应该做什么。
比如,当用户向api.example.com的根目录发出请求,会得到这样一个文档。

{“link”: {
“rel”: “collection https://www.example.com/zoos“,
“href”: “https://api.example.com/zoos“,
“title”: “List of zoos”,
“type”: “application/vnd.yourformat+json”
}}
上面代码表示,文档中有一个link属性,用户读取这个属性就知道下一步该调用什么API了。rel表示这个API与当前网址的关系(collection关系,并给出该collection的网址),href表示API的路径,title表示API的标题,type表示返回类型。
Hypermedia API的设计被称为HATEOAS。Github的API就是这种设计,访问api.github.com会得到一个所有可用API的网址列表。

{
“current_user_url”: “https://api.github.com/user“,
“authorizations_url”: “https://api.github.com/authorizations“,
// …
}
从上面可以看到,如果想获取当前用户的信息,应该去访问api.github.com/user,然后就得到了下面结果。

{
“message”: “Requires authentication”,
“documentation_url”: “https://developer.github.com/v3
}
上面代码表示,服务器给出了提示信息,以及文档的网址。
十一、其他
(1)API的身份认证应该使用OAuth 2.0框架。
(2)服务器返回的数据格式,应该尽量使用JSON,避免使用XML。

=============================华丽的分割线=====================================

1.使用名词而不是动词

Resource 资源

GET 读

POST 创建

PUT 修改

DELETE 删除
/cars 返回 cars集合 创建新的资源 批量更新cars 删除所有cars
/cars/711 返回特定的car 该方法不允许(405) 更新一个指定的资源 擅长指定资源
不要使用:

/getAllCars
/createNewCar
/deleteAllRedCars

2.Get方法和查询参数不应该涉及状态改变

使用PUT, POST 和DELETE 方法 而不是 GET 方法来改变状态,不要使用GET 进行状态改变:

GET /users/711?activate
GET /users/711/activate

3.使用复数名词

不要混淆名词单数和复数,为了保持简单,只对所有资源使用复数。

/cars 而不是 /car
/users 而不是 /user
/products 而不是 /product
/settings 而部署 /setting

  1. 使用子资源表达关系

如果一个资源与另外一个资源有关系,使用子资源:

GET /cars/711/drivers/ 返回 car 711的所有司机
GET /cars/711/drivers/4 返回 car 711的4号司机

5.使用Http头声明序列化格式

在客户端和服务端,双方都要知道通讯的格式,格式在HTTP-Header中指定

Content-Type 定义请求格式
Accept 定义系列可接受的响应格式

6.使用HATEOAS

Hypermedia as the Engine of Application State 超媒体作为应用状态的引擎,超文本链接可以建立更好的文本浏览:

{
“id”: 711,
“manufacturer”: “bmw”,
“model”: “X5”,
“seats”: 5,
“drivers”: [
{
“id”: “23”,
“name”: “Stefan Jauker”,
“links”: [
{
“rel”: “self”,
“href”: “/api/v1/drivers/23”
}
]
}
]
}
注意href指向下一个URL

7.为集合提供过滤 排序 选择和分页等功能

Filtering过滤:

使用唯一的查询参数进行过滤:

GET /cars?color=red 返回红色的cars
GET /cars?seats<=2 返回小于两座位的cars集合

Sorting排序:

允许针对多个字段排序

GET /cars?sort=-manufactorer,+model

这是返回根据生产者降序和模型升序排列的car集合

Field selection

移动端能够显示其中一些字段,它们其实不需要一个资源的所有字段,给API消费者一个选择字段的能力,这会降低网络流量,提高API可用性。

GET /cars?fields=manufacturer,model,id,color

Paging分页

使用 limit 和offset.实现分页,缺省limit=20 和offset=0;

GET /cars?offset=10&limit=5

为了将总数发给客户端,使用订制的HTTP头: X-Total-Count.

链接到下一页或上一页可以在HTTP头的link规定,遵循Link规定:

Link: https://blog.mwaysolutions.com/sample/api/v1/cars?offset=15&limit=5; rel=”next”,
https://blog.mwaysolutions.com/sample/api/v1/cars?offset=50&limit=3; rel=”last”,
https://blog.mwaysolutions.com/sample/api/v1/cars?offset=0&limit=5; rel=”first”,
https://blog.mwaysolutions.com/sample/api/v1/cars?offset=5&limit=5; rel=”prev”,

8.版本化你的API

使得API版本变得强制性,不要发布无版本的API,使用简单数字,避免小数点如2.5.

一般在Url后面使用?v

/blog/api/v1

  1. 使用Http状态码处理错误

如果你的API没有错误处理是很难的,只是返回500和出错堆栈不一定有用

Http状态码提供70个出错,我们只要使用10个左右:

200 – OK – 一切正常
201 – OK – 新的资源已经成功创建
204 – OK – 资源已经成功擅长

304 – Not Modified – 客户端使用缓存数据

400 – Bad Request – 请求无效,需要附加细节解释如 “JSON无效”
401 – Unauthorized – 请求需要用户验证
403 – Forbidden – 服务器已经理解了请求,但是拒绝服务或这种请求的访问是不允许的。
404 – Not found – 没有发现该资源
422 – Unprocessable Entity – 只有服务器不能处理实体时使用,比如图像不能被格式化,或者重要字段丢失。

500 – Internal Server Error – API开发者应该避免这种错误。

使用详细的错误包装错误:

{
“errors”: [
{
“userMessage”: “Sorry, the requested resource does not exist”,
“internalMessage”: “No car found in the database”,
“code”: 34,
“more info”: “http://dev.mwaysolutions.com/blog/api/v1/errors/12345
}
]
}
10.允许覆盖http方法

一些代理只支持POST 和 GET方法, 为了使用这些有限方法支持RESTful API,需要一种办法覆盖http原来的方法。

使用订制的HTTP头 X-HTTP-Method-Override 来覆盖POST 方法.

RESTful API 设计最佳实践

 

RESTful API 设计最佳实践

 
摘要:目前互联网上充斥着大量的关于RESTful API(为了方便,以后API和RESTful API 一个意思)如何设计的文章,然而却没有一个”万能“的设计标准:如何鉴权?API格式如何?你的API是否应该加入版本信息?

背景

目前互联网上充斥着大量的关于RESTful API(为了方便,以后API和RESTful API 一个意思)如何设计的文章,然而却没有一个”万能“的设计标准:如何鉴权?API格式如何?你的API是否应该加入版本信息?当你开始写一个app的时候,特别是后端模型部分已经写完的时候,你不得不殚精竭虑的设计和实现自己app的public API部分。因为一旦发布,对外发布的API将会很难改变。

在给SupportedFu设计API的时候,我试图以实用的角度来解决上面提到的问题。我希望可以设计出容易使用,容易部署,并且足够灵活的API,本文因此而生。

API设计的基本要求

网上的很多关于API设计的观点都十分”学院派“,它们也许更有理论基础,但是有时却和现实世界脱轨(因此我是自由派)。所以我这篇文章的目标是从实践的角度出发,给出当前网络应用的API设计最佳实践(当然,是我认为的最佳了~),如果觉得不合适,我不会遵从标准。当然作为设计的基础,几个必须的原则还是要遵守的:

 

  1. 当标准合理的时候遵守标准。
  2. API应该对程序员友好,并且在浏览器地址栏容易输入。
  3. API应该简单,直观,容易使用的同时优雅。
  4. API应该具有足够的灵活性来支持上层ui。
  5. API设计权衡上述几个原则。

 

需要强调的是:API的就是程序员的UI,和其他UI一样,你必须仔细考虑它的用户体验!

使用RESTful URLs 和action.

虽然前面我说没有一个万能的API设计标准。但确实有一个被普遍承认和遵守:RESTfu设计原则。它被Roy Felding提出(在他的”基于网络的软件架构“论文中第五章)。而REST的核心原则是将你的API拆分为逻辑上的资源。这些资源通过http被操作(GET ,POST,PUT,DELETE)。

那么我应该如何拆分出这些资源呢?

显然从API用户的角度来看,”资源“应该是个名词。即使你的内部数据模型和资源已经有了很好的对应,API设计的时候你仍然不需要把它们一对一的都暴露出来。这里的关键是隐藏内部资源,暴露必需的外部资源。

在SupportFu里,资源是 ticket、user、group。

一旦定义好了要暴露的资源,你可以定义资源上允许的操作,以及这些操作和你的API的对应关系:

 

  • GET /tickets # 获取ticket列表
  • GET /tickets/12 # 查看某个具体的ticket
  • POST /tickets # 新建一个ticket
  • PUT /tickets/12 # 更新ticket 12.
  • DELETE /tickets/12 #删除ticekt 12

 

可以看出使用REST的好处在于可以充分利用http的强大实现对资源的CURD功能。而这里你只需要一个endpoint:/tickets,再没有其他什么命名规则和url规则了,cool!

这个endpoint的单数复数

一个可以遵从的规则是:虽然看起来使用复数来描述某一个资源实例看起来别扭,但是统一所有的endpoint,使用复数使得你的URL更加规整。这让API使用者更加容易理解,对开发者来说也更容易实现。

如何处理关联?关于如何处理资源之间的管理REST原则也有相关的描述:

 

  • GET /tickets/12/messages- Retrieves list of messages for ticket #12
  • GET /tickets/12/messages/5- Retrieves message #5 for ticket #12
  • POST /tickets/12/messages- Creates a new message in ticket #12
  • PUT /tickets/12/messages/5- Updates message #5 for ticket #12
  • PATCH /tickets/12/messages/5- Partially updates message #5 for ticket #12
  • DELETE /tickets/12/messages/5- Deletes message #5 for ticket #12

 

其中,如果这种关联和资源独立,那么我们可以在资源的输出表示中保存相应资源的endpoint。然后API的使用者就可以通过点击链接找到相关的资源。如果关联和资源联系紧密。资源的输出表示就应该直接保存相应资源信息。(例如这里如果message资源是独立存在的,那么上面 GET /tickets/12/messages就会返回相应message的链接;相反的如果message不独立存在,他和ticket依附存在,则上面的API调用返回直接返回message信息)

不符合CURD的操作

对这个令人困惑的问题,下面是一些解决方法:

 

  1. 重构你的行为action。当你的行为不需要参数的时候,你可以把active对应到activated这个资源,(更新使用patch).
  2. 以子资源对待。例如:GitHub上,对一个gists加星操作:PUT /gists/:id/star 并且取消星操作:DELETE /gists/:id/star.
  3. 有时候action实在没有难以和某个资源对应上例如search。那就这么办吧。我认为API的使用者对于/search这种url也不会有太大意见的(毕竟他很容易理解)。只要注意在文档中写清楚就可以了。

 

永远使用SSL

毫无例外,永远都要使用SSL。你的应用不知道要被谁,以及什么情况访问。有些是安全的,有些不是。使用SSL可以减少鉴权的成本:你只需要一个简单的令牌(token)就可以鉴权了,而不是每次让用户对每次请求签名。

值得注意的是:不要让非SSL的url访问重定向到SSL的url。

文档

文档和API本身一样重要。文档应该容易找到,并且公开(把它们藏到pdf里面或者存到需要登录的地方都不太好)。文档应该有展示请求和输出的例子:或者以点击链接的方式或者通过curl的方式(请见openstack的文档)。如果有更新(特别是公开的API),应该及时更新文档。文档中应该有关于何时弃用某个API的时间表以及详情。使用邮件列表或者博客记录是好方法。

版本化

在API上加入版本信息可以有效的防止用户访问已经更新了的API,同时也能让不同主要版本之间平稳过渡。关于是否将版本信息放入url还是放入请求头有过争论:API version should be included in the URL or in a header. 学术界说它应该放到header里面去,但是如果放到url里面我们就可以跨版本的访问资源了。。(参考openstack)。

strip使用的方法就很好:它的url里面有主版本信息,同时请求头俩面有子版本信息。这样在子版本变化过程中url的稳定的。变化有时是不可避免的,关键是如何管理变化。完整的文档和合理的时间表都会使得API使用者使用的更加轻松。

结果过滤,排序,搜索:

url最好越简短越好,和结果过滤,排序,搜索相关的功能都应该通过参数实现(并且也很容易实现)。

过滤:为所有提供过滤功能的接口提供统一的参数。例如:你想限制get /tickets 的返回结果:只返回那些open状态的ticket–get /tickektsstate=open这里的state就是过滤参数。

排序:和过滤一样,一个好的排序参数应该能够描述排序规则,而不业务相关。复杂的排序规则应该通过组合实现:

 

  • GET /ticketssort=-priority- Retrieves a list of tickets in descending order of priority
  • GET /ticketssort=-priority,created_at- Retrieves a list of tickets in descending order of priority. Within a specific priority, older tickets are ordered first

 

这里第二条查询中,排序规则有多个rule以逗号间隔组合而成。

搜索:有些时候简单的排序是不够的。我们可以使用搜索技术(ElasticSearch和Lucene)来实现(依旧可以作为url的参数)。

 

  • GET /ticketsq=return&state=open&sort=-priority,created_at- Retrieve the highest priority open tickets mentioning the word ‘return’

 

对于经常使用的搜索查询,我们可以为他们设立别名,这样会让API更加优雅。例如:
get /ticketsq=recently_closed -> get /tickets/recently_closed.

限制API返回值的域

有时候API使用者不需要所有的结果,在进行横向限制的时候(例如值返回API结果的前十项)还应该可以进行纵向限制。并且这个功能能有效的提高网络带宽使用率和速度。可以使用fields查询参数来限制返回的域例如:
GET /ticketsfields=id,subject,customer_name,updated_at&state=open&sort=-updated_at

更新和创建操作应该返回资源

PUT、POST、PATCH 操作在对资源进行操作的时候常常有一些副作用:例如created_at,updated_at 时间戳。为了防止用户多次的API调用(为了进行此次的更新操作),我们应该会返回更新的资源(updated representation.)例如:在POST操作以后,返回201 created 状态码,并且包含一个指向新资源的url作为返回头

是否需要 “HATEOAS

网上关于是否允许用户创建新的url有很大的异议(注意不是创建资源产生的url)。为此REST制定了HATEOAS来描述了和endpoint进行交互的时候,行为应该在资源的metadata返回值里面进行定义。

(译注:作者这里认为HATEOAS还不算成熟,我也不怎么理解这段就算了,读者感兴趣可以自己去原文查看)

只提供json作为返回格式

现在开始比较一下XML和json了。XML即冗长,难以阅读,又不适合各种编程语言解析。当然XML有扩展性的优势,但是如果你只是将它来对内部资源串行化,那么他的扩展优势也发挥不出来。很多应用(youtube,twitter,box)都已经开始抛弃XML了,我也不想多费口舌。

当然如果的你使用用户里面企业用户居多,那么可能需要支持XML。如果是这样的话你还有另外一个问题:你的http请求中的media类型是应该和accept 头同步还是和url?为了方便(browser explorability),应该是在url中(用户只要自己拼url就好了)。如果这样的话最好的方法是使用.xml或者.json的后缀。

命名方式?

是蛇形命令(下划线和小写)还是驼峰命名?如果使用json那么最好的应该是遵守JAVASCRIPT的命名方法-也就是说骆驼命名法。如果你正在使用多种语言写一个库,那么最好按照那些语言所推荐的,java,c#使用骆驼,python,ruby使用snake。

个人意见:我总觉得蛇形命令更好使一些,当然这没有什么理论的依据。有人说蛇形命名读起来更快,能达到20%,也不知道真假http://ieeexplore.ieee.org/xpl/articleDetails.jsptp=&arnumber=5521745

默认使用pretty print格式,使用gzip

只是使用空格的返回结果从浏览器上看总是觉得很恶心(一大坨有没有?~)。当然你可以提供url上的参数来控制使用“pretty print”,但是默认开启这个选项还是更加友好。格外的传输上的损失不会太大。相反你如果忘了使用gzip那么传输效率将会大大减少,损失大大增加。想象一个用户正在debug那么默认的输出就是可读的-而不用将结果拷贝到其他什么软件中在格式化-是想起来就很爽的事,不是么?

下面是一个例子:

 

$ curl https://API.github.com/users/veesahni > with-whitespace.txt
$ ruby -r json -e 'puts JSON JSON.parse(STDIN.read)' < with-whitespace.txt > without-whitespace.txt
$ gzip -c with-whitespace.txt > with-whitespace.txt.gz
$ gzip -c without-whitespace.txt > without-whitespace.txt.gz

 

输出如下:

 

  • without-whitespace.txt- 1252 bytes
  • with-whitespace.txt- 1369 bytes
  • without-whitespace.txt.gz- 496 bytes
  • with-whitespace.txt.gz- 509 bytes

 

在上面的例子中,多余的空格使得结果大小多出了8.5%(没有使用gzip),相反只多出了2.6%。据说:twitter使用gzip之后它的streaming API传输减少了80%(link:https://dev.twitter.com/blog/announcing-gzip-compression-streaming-APIs).

只在需要的时候使用“envelope”

很多API象下面这样返回结果:

 

[java] view plain copy print ?
 
  1. {
  2. “data” : {
  3. “id” : 123,
  4. “name” : “John”
  5. }
  6. }
{
  "data" : {
    "id" : 123,
    "name" : "John"
  }
}

 

 

理由很简单:这样做可以很容易扩展返回结果,你可以加入一些分页信息,一些数据的元信息等-这对于那些不容易访问到返回头的API使用者来说确实有用,但是随着“标准”的发展(cors和http://tools.ietf.org/html/rfc5988#page-6都开始被加入到标准中了),我个人推荐不要那么做。

何时使用envelope?

有两种情况是应该使用envelope的。如果API使用者确实无法访问返回头,或者API需要支持交叉域请求(通过jsonp)。
jsonp请求在请求的url中包含了一个callback函数参数。如果给出了这个参数,那么API应该返回200,并且把真正的状态码放到返回值里面(包装在信封里),例如:

 

  1. callback_function({
  2. status_code: 200,
  3. next_page: “https://..”,
  4. response: {
  5. … actual JSON response body …
  6. }
  7. })
callback_function({
  status_code: 200,
  next_page: "https://..",
  response: {
    ... actual JSON response body ... 
  }
})

 

 

同样为了支持无法方法返回头的API使用者,可以允许envelope=true这样的参数。

在post,put,patch上使用json作为输入

如果你认同我上面说的,那么你应该决定使用json作为所有的API输出格式,那么我们接下来考虑考虑API的输入数据格式。
很多的API使用url编码格式:就像是url查询参数的格式一样:单纯的键值对。这种方法简单有效,但是也有自己的问题:它没有数据类型的概念。这使得程序不得不根据字符串解析出布尔和整数,而且还没有层次结构–虽然有一些关于层次结构信息的约定存在可是和本身就支持层次结构的json比较一下还是不很好用。

当然如果API本身就很简单,那么使用url格式的输入没什么问题。但对于复杂的API你应该使用json。或者干脆统一使用json。
注意使用json传输的时候,要求请求头里面加入:Content-Type:applicatin/json.否则抛出415异常(unsupported media type)。

分页

分页数据可以放到“信封”里面,但随着标准的改进,现在我推荐将分页信息放到link header里面:http://tools.ietf.org/html/rfc5988#page-6。

使用link header的API应该返回一系列组合好了的url而不是让用户自己再去拼。这点在基于游标的分页中尤为重要。例如下面,来自github的文档

 

Link: <https://api.github.com/user/repos?page=3&per_page=100>; rel="next", 
<https://api.github.com/user/repos?page=50&per_page=100>; rel="last"

 

 

自动加载相关的资源

很多时候,自动加载相关资源非常有用,可以很大的提高效率。但是这却和RESTful的原则相背。为了如此,我们可以在url中添加参数:embed(或者expend)。embed可以是一个逗号分隔的串,例如:

 

GET /ticket/12embed=customer.name,assigned_user

 

 

对应的API返回值如下:

 

  1. {
  2. “id” : 12,
  3. “subject” : “I have a question!”,
  4. “summary” : “Hi, ….”,
  5. “customer” : {
  6. “name” : “Bob”
  7. },
  8. assigned_user: {
  9. “id” : 42,
  10. “name” : “Jim”,
  11. }
  12. }
{
  "id" : 12,
  "subject" : "I have a question!",
  "summary" : "Hi, ....",
  "customer" : {
    "name" : "Bob"
  },
  assigned_user: {
   "id" : 42,
   "name" : "Jim",
  }
}

 

 

值得提醒的是,这个功能有时候会很复杂,并且可能导致N+1 SELECT 问题

重写HTTP方法

有的客户端只能发出简单的GET 和POST请求。为了照顾他们,我们可以重写HTTP请求。这里没有什么标准,但是一个普遍的方式是接受X-HTTP-Method-Override请求头。

速度限制

为了避免请求泛滥,给API设置速度限制很重要。为此 RFC 6585 引入了HTTP状态码429(too many requests)。加入速度设置之后,应该提示用户,至于如何提示标准上没有说明,不过流行的方法是使用HTTP的返回头。

下面是几个必须的返回头(依照twitter的命名规则):

 

  • X-Rate-Limit-Limit :当前时间段允许的并发请求数
  • X-Rate-Limit-Remaining:当前时间段保留的请求数。
  • X-Rate-Limit-Reset:当前时间段剩余秒数

 

为什么使用当前时间段剩余秒数而不是时间戳?

时间戳保存的信息很多,但是也包含了很多不必要的信息,用户只需要知道还剩几秒就可以再发请求了这样也避免了clock skew问题

有些API使用UNIX格式时间戳,我建议不要那么干。为什么?HTTP 已经规定了使用 RFC 1123 时间格式

鉴权 Authentication

restful API是无状态的也就是说用户请求的鉴权和cookie以及session无关,每一次请求都应该包含鉴权证明。

通过使用ssl我们可以不用每次都提供用户名和密码:我们可以给用户返回一个随机产生的token。这样可以极大的方便使用浏览器访问API的用户。这种方法适用于用户可以首先通过一次用户名-密码的验证并得到token,并且可以拷贝返回的token到以后的请求中。如果不方便,可以使用OAuth 2来进行token的安全传输。

支持jsonp的API需要额外的鉴权方法,因为jsonp请求无法发送普通的credential。这种情况下可以在查询url中添加参数:access_token。注意使用url参数的问题是:目前大部分的网络服务器都会讲query参数保存到服务器日志中,这可能会成为大的安全风险。

注意上面说到的只是三种传输token的方法,实际传输的token可能是一样的。

缓存

HTTP提供了自带的缓存框架。你需要做的是在返回的时候加入一些返回头信息,在接受输入的时候加入输入验证。基本两种方法:

ETag:当生成请求的时候,在HTTP头里面加入ETag,其中包含请求的校验和和哈希值,这个值和在输入变化的时候也应该变化。如果输入的HTTP请求包含IF-NONE-MATCH头以及一个ETag值,那么API应该返回304 not modified状态码,而不是常规的输出结果。

Last-Modified:和etag一样,只是多了一个时间戳。返回头里的Last-Modified:包含了 RFC 1123 时间戳,它和IF-MODIFIED-SINCE一致。HTTP规范里面有三种date格式,服务器应该都能处理。

出错处理

就像html错误页面能够显示错误信息一样,API 也应该能返回可读的错误信息–它应该和一般的资源格式一致。API应该始终返回相应的状态码,以反映服务器或者请求的状态。API的错误码可以分为两部分,400系列和500系列,400系列表明客户端错误:如错误的请求格式等。500系列表示服务器错误。API应该至少将所有的400系列的错误以json形式返回。如果可能500系列的错误也应该如此。json格式的错误应该包含以下信息:一个有用的错误信息,一个唯一的错误码,以及任何可能的详细错误描述。如下:

 

  1. {
  2. “code” : 1234,
  3. “message” : “Something bad happened http://blog.jobbole.com/wp-includes/images/smilies/icon_sad.gif” alt=”:-(” class=”wp-smiley”> “,
  4. “description” : “More details about the error here”
  5. }
{
  "code" : 1234,
  "message" : "Something bad happened :-( ",
  "description" : "More details about the error here"
}

 

 

对PUT,POST,PATCH的输入的校验也应该返回相应的错误信息,例如:

 

  1. “code” : 1024,
  2. “message” : “Validation Failed”,
  3. “errors” : [
  4. {
  5. “code” : 5432,
  6. “field” : “first_name”,
  7. “message” : “First name cannot have fancy characters”
  8. },
  9. {
  10. “code” : 5622,
  11. “field” : “password”,
  12. “message” : “Password cannot be blank”
  13. }
  14. ]
  15. }
{
  "code" : 1024,
  "message" : "Validation Failed",
  "errors" : [
    {
      "code" : 5432,
      "field" : "first_name",
      "message" : "First name cannot have fancy characters"
    },
    {
       "code" : 5622,
       "field" : "password",
       "message" : "Password cannot be blank"
    }
  ]
}

 

 

HTTP 状态码

 

200 ok  - 成功返回状态,对应,GET,PUT,PATCH,DELETE.
 201 created  - 成功创建。
 304 not modified   - HTTP缓存有效。
 400 bad request   - 请求格式错误。
 401 unauthorized   - 未授权。
 403 forbidden   - 鉴权成功,但是该用户没有权限。
 404 not found - 请求的资源不存在
 405 method not allowed - 该http方法不被允许。
 410 gone - 这个url对应的资源现在不可用。
 415 unsupported media type - 请求类型错误。
 422 unprocessable entity - 校验错误时用。
 429 too many request - 请求过多。

JAVA – 锁介绍

在介绍锁之前我们先介绍一个线程不安全的例子,一个全局的list,开2个线程往里面插入数据,代码如下:

  1. package com.jvm.day6.lock.demo;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5.   
  6. /** 
  7.  * 测试都线程共享一个变量带来的现象 
  8.  * @Author:xuehan 
  9.  * @Date:2016年3月20日下午3:35:29 
  10.  */  
  11. class NumberAdd implements Runnable{  
  12.       
  13.     public static  List<Integer> numberList =new ArrayList<Integer>();  
  14.     public static int startNum;  
  15.     public NumberAdd(int startNum){  
  16.         this.startNum = startNum;  
  17.     }  
  18.     @Override  
  19.     public void run() {  
  20.         int count = 0;  
  21.         while(count < 1000000){  
  22.             numberList.add(count ++ );  
  23.             startNum = startNum + 2;  
  24.               
  25.         }  
  26.     }  
  27. }  
  28. public class Test{  
  29.     public static void main(String[] args) throws Exception {  
  30.         Thread t1 = new Thread(new NumberAdd(1));  
  31.         Thread t2 = new Thread(new NumberAdd(0));  
  32.         t1.start();  
  33.         t2.start();  
  34.         while(t1.isAlive() || t2.isAlive()){Thread.sleep(2);}  
  35.         System.out.println(“集合大小” + NumberAdd.numberList.size() );  
  36.         System.out.println( “最后一个值的大”  + NumberAdd.numberList.get(NumberAdd.numberList.size() – 1));  
  37.        for(int i = NumberAdd.numberList.size()  – 10 ;  i < NumberAdd.numberList.size() –1; i ++){  
  38.            System.out.println(NumberAdd.numberList.get(i));  
  39.        }  
  40.    
  41.     }  
  42. }  

 

 按照开始想的,集合里面应该有200万个数据了,结果却出现了数组越界的错误,为什么呢,这是因为ArrayList不是线程安全的,用来存放数据的elementData是共享的, 线程A往list里添加数据的时候刚验证大小通过,还没有插入,然后轮到线程B执行,线程B刚好插入了list该扩容的最后一个元素,然后list满了,线程A执行,A线程往集合里面插入元素,引起了数据越界。

 



 

 

jvm锁

每个对象都一个mark头,他的作用是:

Mark Word,对象头的标记,32位

描述对象的hash、锁信息,垃圾回收标记,年龄

指向锁记录的指针

指向monitor的指针

GC标记

偏向锁线程ID

 

偏向锁
jvm控制,可以设置jvm启动参数
大部分情况是没有竞争的,所以可以通过偏向来提高性能
所谓的偏向,就是偏心,即锁会偏向于当前已经占有锁的线程
将对象头Mark的标记设置为偏向,并将线程ID写入对象头Mark
只要没有竞争,获得偏向锁的线程,在将来进入同步块,不需要做同步
当其他线程请求相同的锁时,偏向模式结束
-XX:+UseBiasedLocking -jdk6需要手动打开
默认启用
在竞争激烈的场合,偏向锁会增加系统负担
偏向锁是系统自带的设置参数开启,java代码如下:
  1. package com.jvm.day6.lock;  
  2.   
  3. import java.util.List;  
  4. import java.util.Vector;  
  5.   
  6. /** 
  7.  *使用 偏向锁-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 
  8.  * 不使用偏向锁-XX:-UseBiasedLocking 
  9.  * 根据测试下面代码使用偏向锁可提高150毫秒执行时间 150/2300,提高效率6%  
  10.  * @Author:xuehan 
  11.  * @Date:2016年3月19日下午12:06:15 
  12.  */  
  13. public class DeflectionLock {  
  14.         public static List<Integer> numberList =new Vector<Integer>();  
  15.         public static void main(String[] args) throws InterruptedException {  
  16.             System.out.println((int)‘l’);  
  17.             long begin=System.currentTimeMillis();  
  18.             int count=0;  
  19.             int startnum=0;  
  20.             while(count<10000000){  
  21.                 numberList.add(startnum);  
  22.                 startnum+=2;  
  23.                 count++;  
  24.             }  
  25.             long end=System.currentTimeMillis();  
  26.             System.out.println(end-begin);  
  27.   
  28.     }  
  29. }  
 根据我的写实偏向锁可以提高性能6%
轻量级锁 BasicObjectLock,这个锁是嵌入到线程栈中的,他有两部组成BasicLock和ptr to obj hold the lock,BasicLock里面存放着对象头,ptr to obj hold the lock为指向持有对象锁的指针
普通的锁处理性能不够理想,轻量级锁是一种快速的锁定方法。
如果对象没有被锁定
将对象头的Mark指针保存到锁对象中
将对象头设置为指向锁的指针(在线程栈空间中)

如果轻量级锁失败,表示存在竞争,升级为重量级锁(常规锁)

在没有锁竞争的前提下,减少传统锁使用OS互斥量产生的性能损耗

 

在竞争激烈时,轻量级锁会多做很多额外操作,导致性能下降

自旋锁
当竞争存在时,如果线程可以很快获得锁,那么可以不在OS层挂起线程,让线程做几个空操作(自旋)
JDK1.6中-XX:+UseSpinning开启
JDK1.7中,去掉此参数,改为内置实现
如果同步块很长,自旋失败,会降低系统性能
如果同步块很短,自旋成功,节省线程挂起切换时间,提升系统性能
 

上面的一些锁不是Java语言层面的锁优化方法

他们是内置于JVM中的获取锁的优化方法和获取锁的步骤

偏向锁可用会先尝试偏向锁

轻量级锁可用会先尝试轻量级锁

以上都失败,尝试自旋锁

再失败,尝试普通锁,使用OS互斥量在操作系统层挂起

 

我们可以从以下方面对锁进行优化

减少锁的时间

没必须放在同步块的代码尽量不要放在代码块里

减少锁的粒度

将大对象,拆成小对象,大大增加并行度,降低锁竞争

 

偏向锁,轻量级锁成功率提高

实现的例子如ConcurrentHashMap

使用若干个Segment :Segment<K,V>[] segments

Segment中维护HashEntry<K,V>

put操作时

先定位到Segment,锁定一个Segment,执行put

 

在减小锁粒度后, ConcurrentHashMap允许若干个线程同时进入

 

锁分离

根据功能进行锁分离

ReadWriteLock

读多写少的情况,可以提高性能



 
读写分离思想可以延伸,只要操作互不影响,锁就可以分离

LinkedBlockingQueue

队列

链表



 
锁粗化

通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,即在使用完公共资源后,应该立即释放锁。只有这样,等待在这个锁上的其他线程才能尽早的获得资源执行任务。但是,凡事都有一个度,如果对同一个锁不停的进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能的优化

 

Java代码
  1. for(int i=0;i<1000;i++){  
  2.     synchronized(lock){}  
  3.           
  4. }  
  5. // 应该写成  
  6. synchronized(lock){  
  7.    for(int i =0; i < 1000; i ++){}  
  8. }  

 

这时候我们要增加锁的持有时间不要让请求和释放锁频繁的发生

 

锁消除

在java方法体里如果不是共享的变量不需要同步操作的,这时候jvm会自动的优化把锁去掉,如StingBuffer和Vector,使用锁消除

 

-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks

关闭锁消除

 

-server -XX:+DoEscapeAnalysis -XX:-EliminateLocks

 

无锁

锁是悲观的操作

无锁是乐观的操作

无锁的一种实现方式

CAS(Compare And Swap),CAS是原子的

非阻塞的同步

CAS(V,E,N)

在应用层面判断多线程的干扰,如果有干扰,则通知线程重试,一般这样做会让程序变的复杂,但性能更加好。

Kafka详细原理总结

转自:chenxiangxiang的博客

Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
 
1.前言
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
 
 1.1  Kafka的特性:
– 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
– 可扩展性:kafka集群支持热扩展
– 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
– 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
– 高并发:支持数千个客户端同时读写
 
1.2   Kafka的使用场景:
– 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
– 消息系统:解耦和生产者和消费者、缓存消息等。
– 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
– 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
– 流式处理:比如spark streaming和storm
– 事件源
 
1.3  Kakfa的设计思想
– Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。
这里曾经发生过一个bug,TalkingData使用Kafka0.8.1的时候,kafka controller在Zookeeper上注册成功后,它和Zookeeper通信的timeout时间是6s,也就是如果kafka controller如果有6s中没有和Zookeeper做心跳,那么Zookeeper就认为这个kafka controller已经死了,就会在Zookeeper上把这个临时节点删掉,那么其他Kafka就会认为controller已经没了,就会再次抢着注册临时节点,注册成功的那个kafka broker成为controller,然后,之前的那个kafka controller就需要各种shut down去关闭各种节点和事件的监听。但是当kafka的读写流量都非常巨大的时候,TalkingData的一个bug是,由于网络等原因,kafka controller和Zookeeper有6s中没有通信,于是重新选举出了一个新的kafka controller,但是原来的controller在shut down的时候总是不成功,这个时候producer进来的message由于Kafka集群中存在两个kafka controller而无法落地。导致数据淤积。
这里曾经还有一个bug,TalkingData使用Kafka0.8.1的时候,当ack=0的时候,表示producer发送出去message,只要对应的kafka broker topic partition leader接收到的这条message,producer就返回成功,不管partition leader 是否真的成功把message真正存到kafka。当ack=1的时候,表示producer发送出去message,同步的把message存到对应topic的partition的leader上,然后producer就返回成功,partition leader异步的把message同步到其他partition replica上。当ack=all或-1,表示producer发送出去message,同步的把message存到对应topic的partition的leader和对应的replica上之后,才返回成功。但是如果某个kafka controller 切换的时候,会导致partition leader的切换(老的 kafka controller上面的partition leader会选举到其他的kafka broker上),但是这样就会导致丢数据。
–  Consumergroup:各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。所以如果想同时对一个topic做消费的话,启动多个consumer group就可以了,但是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。它不能像AMQ那样可以多个BET作为consumer去互斥的(for update悲观锁)并发处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许同一个consumer group下的一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。如果想多个不同的业务都需要这个topic的数据,起多个consumer group就好了,大家都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
    当启动一个consumer group去消费一个topic的时候,无论topic里面有多个少个partition,无论我们consumer group里面配置了多少个consumer thread,这个consumer group下面的所有consumer thread一定会消费全部的partition;即便这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费所有的partition。因此,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
    同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不能够一个consumer group的多个consumer同时消费一个partition。
    一个consumer group下,无论有多少个consumer,这个consumer group一定回去把这个topic下所有的partition都消费了。当consumer group里面的consumer数量小于这个topic下的partition数量的时候,如下图groupA,groupB,就会出现一个conusmer thread消费多个partition的情况,总之是这个topic下的partition都会被消费。如果consumer group里面的consumer数量等于这个topic下的partition数量的时候,如下图groupC,此时效率是最高的,每个partition都有一个consumer thread去消费。当consumer group里面的consumer数量大于这个topic下的partition数量的时候,如下图GroupD,就会有一个consumer thread空闲。因此,我们在设定consumer group的时候,只需要指明里面有几个consumer数量即可,无需指定对应的消费partition序号,consumer会自动进行rebalance。
    多个Consumer Group下的consumer可以消费同一条message,但是这种消费也是以o(1)的方式顺序的读取message去消费,,所以一定会重复消费这批message的,不能向AMQ那样多个BET作为consumer消费(对message加锁,消费的时候不能重复消费message)
– Consumer Rebalance的触发条件:(1)Consumer增加或删除会触发 Consumer Group的Rebalance(2)Broker的增加或者减少都会触发 Consumer Rebalance
– Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。所以必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由自己维护。一般来说都是使用high level api的。Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也可以配置成读完消息处理再commit,这种情况下consumer端的响应就会比较慢的,需要等处理完才行。
一般情况下,一定是一个consumer group处理一个topic的message。Best Practice是这个consumer group里面consumer的数量等于topic里面partition的数量,这样效率是最高的,一个consumer thread处理一个partition。如果这个consumer group里面consumer的数量小于topic里面partition的数量,就会有consumer thread同时处理多个partition(这个是kafka自动的机制,我们不用指定),但是总之这个topic里面的所有partition都会被处理到的。。如果这个consumer group里面consumer的数量大于topic里面partition的数量,多出的consumer thread就会闲着啥也不干,剩下的是一个consumer thread处理一个partition,这就造成了资源的浪费,因为一个partition不可能被两个consumer thread去处理。所以我们线上的分布式多个service服务,每个service里面的kafka consumer数量都小于对应的topic的partition数量,但是所有服务的consumer数量只和等于partition的数量,这是因为分布式service服务的所有consumer都来自一个consumer group,如果来自不同的consumer group就会处理重复的message了(同一个consumer group下的consumer不能处理同一个partition,不同的consumer group可以处理同一个topic,那么都是顺序处理message,一定会处理重复的。一般这种情况都是两个不同的业务逻辑,才会启动两个consumer group来处理一个topic)。
 
如果producer的流量增大,当前的topic的parition数量=consumer数量,这时候的应对方式就是很想扩展:增加topic下的partition,同时增加这个consumer group下的consumer。
                
– Delivery Mode : Kafka producer 发送message不用维护message的offsite信息,因为这个时候,offsite就相当于一个自增id,producer就尽管发送message就好了。而且Kafka与AMQ不同,AMQ大都用在处理业务逻辑上,而Kafka大都是日志,所以Kafka的producer一般都是大批量的batch发送message,向这个topic一次性发送一大批message,load balance到一个partition上,一起插进去,offsite作为自增id自己增加就好。但是Consumer端是需要维护这个partition当前消费到哪个message的offsite信息的,这个offsite信息,high level api是维护在Zookeeper上,low level api是自己的程序维护。(Kafka管理界面上只能显示high level api的consumer部分,因为low level api的partition offsite信息是程序自己维护,kafka是不知道的,无法在管理界面上展示 )当使用high level api的时候,先拿message处理,再定时自动commit offsite+1(也可以改成手动), 并且kakfa处理message是没有锁操作的。因此如果处理message失败,此时还没有commit offsite+1,当consumer thread重启后会重复消费这个message。但是作为高吞吐量高并发的实时处理系统,at least once的情况下,至少一次会被处理到,是可以容忍的。如果无法容忍,就得使用low level api来自己程序维护这个offsite信息,那么想什么时候commit offsite+1就自己搞定了。
 
– Topic & Partition:Topic相当于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行load balance,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量]  )。物理上存储上,这个topic会分成一个或多个partition,每个partiton相当于是一个子queue。在物理结构上,每个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic可以有无数多的partition,根据业务需求和数据量来设置。在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在创建Topic时通过参数指定parittion数量。Topic创建之后通过Kafka提供的工具也可以修改partiton数量。
   一般来说,(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。(2)同一个Partition的Replica尽量分散到不同的机器,高可用。
  当add a new partition的时候,partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance
– Partition Replica:每个partition可以在其他的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。这样如果某个broker宕机,其实整个kafka内数据依然是完整的。但是,replica副本数越高,系统虽然越稳定,但是回来带资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。
  (1)怎样传送消息:producer先把message发送到partition leader,再由leader发送给其他partition follower。(如果让producer发送给每个replica那就太慢了)
  (2)在向Producer发送ACK前需要保证有多少个Replica已经收到该消息:根据ack配的个数而定
  (3)怎样处理某个Replica不工作的情况:如果这个部工作的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。如果这个不工作的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工作的partition replca写message成功,但是会等到time out,然后返回失败因为某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工作的partition replica从ack列表中移除,以后的producer发送message的时候就不会有这个ack列表下的这个部工作的partition replica了。 
  (4)怎样处理Failed Replica恢复回来的情况:如果这个partition replica之前不在ack列表中,那么启动后重新受Zookeeper管理即可,之后producer发送message的时候,partition leader会继续发送message到这个partition follower上。如果这个partition replica之前在ack列表中,此时重启后,需要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工作的partition replica的时候自动从ack列表中移除的)
– Partition leader与follower:partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其他的broker的partition follower上选择follower变为parition leader。
– Topic分配partition和partition replica的算法:(1)将Broker(size=n)和待分配的Partition排序。(2)将第i个Partition分配到第(i%n)个Broker上。(3)将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上
 
– 消息投递可靠性
一个消息如何算投递成功,Kafka提供了三种模式:
– 第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;
– 第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;
– 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型
  消息在broker上的可靠性,因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。
  消息消费的可靠性,Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况,这种情况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决,但是如果你的应用不在乎重复消费,那就干脆不要解决,以换取最大的性能。
 
– Partition ack:当ack=1,表示producer写partition leader成功后,broker就返回成功,无论其他的partition follower是否写成功。当ack=2,表示producer写partition leader和其他一个follower成功的时候,broker就返回成功,无论其他的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer全部写成功的时候,才算成功,kafka broker才返回成功信息。这里需要注意的是,如果ack=1的时候,一旦有个broker宕机导致partition的follower和leader切换,会导致丢数据。
  
– message状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
– message持久化:Kafka中会把消息持久化到本地文件系统中,并且保持o(1)极高的效率。我们众所周知IO读取是非常耗资源的性能也是最慢的,这就是为了数据库的瓶颈经常在IO上,需要换SSD硬盘的原因。但是Kafka作为吞吐量极高的MQ,却可以非常高效的message持久化到文件。这是因为Kafka是顺序写入o(1)的时间复杂度,速度非常快。也是高吞吐量的原因。由于message的写入持久化是顺序写入的,因此message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。一般的机器,单机每秒100k条数据。
– message有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
– Produer : Producer向Topic发送message,不需要指定partition,直接发送就好了。kafka通过partition ack来控制是否发送成功并把信息返回给producer,producer可以有任意多的thread,这些kafka服务器端是不care的。Producer端的delivery guarantee默认是At least once的。也可以设置Producer异步发送实现At most once。Producer可以用主键幂等性实现Exactly once
– Kafka高吞吐量: Kafka的高吞吐量体现在读写上,分布式并发的读和写都非常快,写的性能体现在以o(1)的时间复杂度进行顺序写入。读的性能体现在以o(1)的时间复杂度进行顺序读取, 对topic进行partition分区,consume group中的consume线程可以以很高能性能进行顺序读。
– Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,但是可能会重复传输;(3)Exactly once每条信息肯定会被传输一次且仅传输一次,这是用户想要的。
– 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
– push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
– Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
– 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
– 同步异步:Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
– 分区机制partition:Kafka的broker端支持消息分区partition,Producer可以决定把消息发到哪个partition,在一个partition 中message的顺序就是Producer发送消息的顺序,一个topic中可以有多个partition,具体partition的数量是可配置的。partition的概念使得kafka作为MQ可以横向扩展,吞吐量巨大。partition可以设置replica副本,replica副本存在不同的kafka broker节点上,第一个partition是leader,其他的是follower,message先写到partition leader上,再由partition leader push到parition follower上。所以说kafka可以水平扩展,也就是扩展partition。
– 离线数据装载:Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。
– 实时数据与离线数据:kafka既支持离线数据也支持实时数据,因为kafka的message持久化到文件,并可以设置有效期,因此可以把kafka作为一个高效的存储来使用,可以作为离线数据供后面的分析。当然作为分布式实时消息系统,大多数情况下还是用于实时的数据处理的,但是当cosumer消费能力下降的时候可以通过message的持久化在淤积数据在kafka。
– 插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
– 解耦:  相当于一个MQ,使得Producer和Consumer之间异步的操作,系统之间解耦
– 冗余:  replica有多个副本,保证一个broker node宕机后不会影响整个服务
– 扩展性:  broker节点可以水平扩展,partition也可以水平增加,partition replica也可以水平增加
– 峰值:  在访问量剧增的情况下,kafka水平扩展, 应用仍然需要继续发挥作用
– 可恢复性:  系统的一部分组件失效时,由于有partition的replica副本,不会影响到整个系统。
– 顺序保证性:由于kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。
– 缓冲:由于producer那面可能业务很简单,而后端consumer业务会很复杂并有数据库的操作,因此肯定是producer会比consumer处理速度快,如果没有kafka,producer直接调用consumer,那么就会造成整个系统的处理速度慢,加一层kafka作为MQ,可以起到缓冲的作用。
– 异步通信:作为MQ,Producer与Consumer异步通信

2.Kafka文件存储机制

2.1 Kafka部分名词解释如下:
 
     Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
  • Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
  • Segment:partition物理上由多个segment组成,每个Segment存着message信息
  • Producer : 生产message发送到topic
  • Consumer : 订阅topic消费message, consumer作为一个线程来消费
  • Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。

  • 2.2 kafka一些原理概念
1.持久化
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.
 
2.性能
除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到”磁盘IO数据”/”内核内存”/”进程内存”/”网络缓冲区”,多者之间的数据copy).
其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.
 
3.负载均衡
kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含”集群中存活的servers列表”/”partitions leader列表”等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”.
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
 
4.Topic模型
其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种”宽松”的设计,将会有”丢失”消息/”消息重发”的危险.
 
5.消息传输一致
Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的情况;
最多1次:可能会出现数据丢失情况;
恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。
 
at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是”at most once”.
at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是”at least once”.
“Kafka Cluster”到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:
最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。
 
6.副本
kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个”consumer”,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower”落后”太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是”committed”,那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.
选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到”负载均衡”,partition leader较少的broker将会更有可能成为新的leader.
 
7.log
每个log entry格式为”4个字节的数字N表示消息的长度” + “N个字节的消息内容”;每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为”最小offset”.kafka.例如”00000000000.kafka”;其中”最小offset”表示此segment中起始消息的offset.
获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.
 
8.分布式
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了”负载均衡”.一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.
Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.
Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了”一个partition”只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些”游离”的partitions)
当consumer启动时,所触发的操作:
A) 首先进行”Consumer id Registry”;
B) 然后在”Consumer id Registry”节点下注册一个watch用来监听当前group中其他consumer的”leave”和”join”;只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
C) 在”Broker id registry”节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
 
总结:
1) Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。
 
9.Leader的选择
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
1. 等待ISR中的任何一个节点恢复并担任leader。
2. 选择所有节点中(不只是ISR)第一个恢复的节点作为leader.
这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。
 
10.副本管理
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
 
11.Leader与副本同步
对于某个分区来说,保存正分区的”broker”为该分区的”leader”,保存备份分区的”broker”为该分区的”follower”。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的”broker”上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的”leader”进行通信。
 
Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
1. 节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接。
2. 如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久。
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
 
 
  • 2.3  kafka拓扑结构

       一个典型的Kafka集群中包含若干Producer(可以是web前端FET,或者是服务器日志等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干ConsumerGroup,以及一个Zookeeper集群。Kafka通过Zookeeper管理Kafka集群配置:选举Kafka broker的leader,以及在Consumer Group发生变化时进行rebalance,因为consumer消费kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
 

分析过程分为以下4个步骤:

  • topic中partition存储分布
  • partiton中文件存储方式 (partition在linux服务器上就是一个目录(文件夹))
  • partiton中segment文件存储结构
  • 在partition中如何通过offset查找message

通过上述4过程详细分析,我们就可以清楚认识到kafka文件存储机制的奥秘。

 
2.3 topic中partition存储分布

假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2个topic名 称分别为report_push、launch_info, partitions数量都为partitions=4

存储路径和目录规则为:

xxx/message-folder

  |–report_push-0
  |–report_push-1
  |–report_push-2
  |–report_push-3
  |–launch_info-0
  |–launch_info-1
  |–launch_info-2
  |–launch_info-3
 
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition组成,其组织结构如下图所示:
 
我们可以看到,Partition是一个Queue的结构,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition上,其中的每一个消息都被赋予了一个唯一的offset值。
 
Kafka集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。
 
Kafka只维护在Partition中的offset值,因为这个offsite标识着这个partition的message消费到哪条了。Consumer每消费一个消息,offset就会加1。其实消息的状态完全是由Consumer控制的,Consumer可以跟踪和重设这个offset值,这样的话Consumer就可以读取任意位置的消息。
 
把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;第二就是可以提高并发,因为可以以Partition为单位读写了。
 
通过上面介绍的我们可以知道,kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。如果你的副本数量设置为3,那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。
 
Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的message的数量。Producer在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader作为读写用。
 
关于如何设置partition值需要考虑的因素。一个partition只能被一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于同时运行的consumer的数量。另外一方面,建议partition的数量大于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每个topic都有上百个partition。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,如果partition数量越大,就要为kafka分配更大的heap space。
2.4 partiton中文件存储方式
 
  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
  • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

2.5 partiton中segment文件存储结构
producer发message到某个topic,message会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),kafka broker收到message往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
 
每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
  • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
 
每个segment中存储很多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

下面文件列表是笔者在Kafka broker上做的一个实验,创建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据,如下图2所示segment文件列表形象说明了上述2个规则:

以上述图2中一对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下:

上述图3中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中 元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移 地址为497。

从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构如下:

参数说明:

关键字解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic” 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据。
 
2.6 在partition中如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

  • 第一步查找segment file

    上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。

    当offset=368776时定位到00000000000000368769.index|log

  • 第二步通过segment file查找message通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到 offset=368776为止。

segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
 
kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted topic中。同时Kafka又在内存中维护了的三元组来维护最新的offset信息,consumer来取最新offset信息的时候直接内存里拿即可。当然,kafka允许你快速的checkpoint最新的offset信息到磁盘上。
 
3.Partition Replication原则

Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
 
 

1. Kafka集群partition replication默认自动分配分析

下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:

(1)

 

 

(2)当集群中新增2节点,Partition增加到6个时分布情况如下:

 

副本分配逻辑规则如下:

  • 在Kafka集群中,每个Broker都有均等分配Partition的Leader机会。
  • 上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。
  • 上述图种每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。
 
副本分配算法如下:
  • 将所有N Broker和待分配的i个Partition排序.
  • 将第i个Partition分配到第(i mod n)个Broker上.
  • 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.
 
4.Kafka Broker一些特性
4.1 无状态的Kafka Broker :
1. Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
2. Broker不保存订阅者的状态,由订阅者自己保存。
3. 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
4. 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset进行重新读取消费消息。
 
4.2 message的交付与生命周期 :
1. 不是严格的JMS, 因此kafka对消息的重复、丢失、错误以及顺序型没有严格的要求。(这是与AMQ最大的区别)
2. kafka提供at-least-once delivery,即当consumer宕机后,有些消息可能会被重复delivery。
3. 因每个partition只会被consumer group内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的订阅。
4. Kafka为每条消息为每条消息计算CRC校验,用于错误检测,crc校验不通过的消息会直接被丢弃掉。
 
4.3 压缩
 
Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩之后,在Consumer端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU。
 
那么如何区分消息是压缩的还是未压缩的呢,Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为0,则表示消息未被压缩。
 
4.4 消息可靠性
 
在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况:
 
– 一个消息发送失败
 
– 一个消息被发送多次
 
– 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次
 
有许多系统声称它们实现了exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息,但是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理取过来的消息时失败了。
 
从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。
 
从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。
 
4.5 备份机制
 
备份机制是Kafka0.8版本的新特性,备份机制的出现大大提高了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了Kafka的备份机制:
 
 
4.6 Kafka高效性相关设计
 
4.6.1 消息的持久化
Kafka高度依赖文件系统来存储和缓存消息(AMQ的nessage是持久化到mysql数据库中的),因为一般的人认为磁盘是缓慢的,这导致人们对持久化结构具有竞争性持怀疑态度。其实,磁盘的快或者慢,这决定于我们如何使用磁盘。因为磁盘线性写的速度远远大于随机写。线性读写在大多数应用场景下是可以预测的。
4.6.2 常数时间性能保证
每个Topic的Partition的是一个大文件夹,里面有无数个小文件夹segment,但partition是一个队列,队列中的元素是segment,消费的时候先从第0个segment开始消费,新来message存在最后一个消息队列中。对于segment也是对队列,队列元素是message,有对应的offsite标识是哪个message。消费的时候先从这个segment的第一个message开始消费,新来的message存在segment的最后。
 
消息系统的持久化队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。它有一个优点,所有的操作都是常数时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。
 
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着我们可以提供一般消息系统无法提供的特性。比如说,消息被消费后不是立马被删除,我们可以将这些消息保留一段相对比较长的时间(比如一个星期)。
 
5.Kafka 生产者-消费者
     消息系统通常都会由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息,不同的MQ实现的Broker实现会有所不同,不过Broker的本质都是要负责将消息落地到服务端的存储系统中。具体步骤如下:
  1. 生产者客户端应用程序产生消息:

    1. 客户端连接对象将消息包装到请求中发送到服务端
    2. 服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来
    3. 服务端返回响应结果给生产者客户端
  2. 消费者客户端应用程序消费消息:

    1. 客户端连接对象将消费信息也包装到请求中发送给服务端
    2. 服务端从文件存储系统中取出消息
    3. 服务端返回响应结果给消费者客户端
    4. 客户端将响应结果还原成消息并开始处理消息
 
                                                                              图4-1 客户端和服务端交互
 
5.1  Producers
 
Producers直接发送消息到broker上的leader partition,不需要经过任何中介或其他路由转发。为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。
 
Producer客户端自己控制着消息被推送到哪些partition。实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的partition,用户可以为每个消息指定一个partitionKey,通过这个key来实现一些hash分区算法。比如,把userid作为partitionkey的话,相同userid的消息将会被推送到同一个partition。
 
以Batch的方式推送数据可以极大的提高处理效率,kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送请求。Batch的数量大小可以通过Producer的参数控制,参数值可以设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。
 
Producers可以异步的并行的向kafka发送消息,但是通常producer在发送完消息之后会得到一个future响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,如果acks设置数量为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
 
若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待直到broker确认收到消息。若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
 
Kafka 消息有一个定长的header和变长的字节数组组成。因为kafka消息支持字节数组,也就使得kafka可以支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但我们推荐消息大小不要超过1MB,通常一般消息大小都在1~10kB之前。
 
发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息,一次行发布),send消息时,producer client需指定消息所属的topic。
 
5.2  Consumers
Kafka提供了两套consumer api,分为high-level api和sample-api。Sample-api 是一个底层的API,它维持了一个和单一broker的连接,并且这个API是完全无状态的,每次请求都需要指定offset值,因此,这套API也是最灵活的。
 
在kafka中,当前读到哪条消息的offset值是由consumer来维护的,因此,consumer可以自己决定如何读取kafka中的数据。比如,consumer可以通过重设offset值来重新消费已消费过的数据。不管有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过期时间,kafka才会删除这些数据。(这一点与AMQ不一样,AMQ的message一般来说都是持久化到mysql中的,消费完的message会被delete掉)
 
High-level API封装了对集群中一系列broker的访问,可以透明的消费一个topic。它自己维持了已消费消息的状态,即每次消费的都是下一个消息。
 
High-level API还支持以组的形式消费topic,如果consumers有同一个组名,那么kafka就相当于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不同的组名,那么此时kafka就相当与一个广播服务,会把topic中的所有消息广播到每个consumer。
 
High level api和Low level api是针对consumer而言的,和producer无关。
 
High level api是consumer读的partition的offsite是存在zookeeper上。High level api 会启动另外一个线程去每隔一段时间,offsite自动同步到zookeeper上。换句话说,如果使用了High level api, 每个message只能被读一次,一旦读了这条message之后,无论我consumer的处理是否ok。High level api的另外一个线程会自动的把offiste+1同步到zookeeper上。如果consumer读取数据出了问题,offsite也会在zookeeper上同步。因此,如果consumer处理失败了,会继续执行下一条。这往往是不对的行为。因此,Best Practice是一旦consumer处理失败,直接让整个conusmer group抛Exception终止,但是最后读的这一条数据是丢失了,因为在zookeeper里面的offsite已经+1了。等再次启动conusmer group的时候,已经从下一条开始读取处理了。
 
Low level api是consumer读的partition的offsite在consumer自己的程序中维护。不会同步到zookeeper上。但是为了kafka manager能够方便的监控,一般也会手动的同步到zookeeper上。这样的好处是一旦读取某个message的consumer失败了,这条message的offsite我们自己维护,我们不会+1。下次再启动的时候,还会从这个offsite开始读。这样可以做到exactly once对于数据的准确性有保证。
 
 
对于Consumer group:
1. 允许consumer group(包含多个consumer,如一个集群同时消费)对一个topic进行消费,不同的consumer group之间独立消费。
2. 为了对减小一个consumer group中不同consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不同的partition。
 
 
Consumer与Partition的关系:
– 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
– 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
– 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
– 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
– High-level接口中获取不到数据的时候是会block的
 
负载低的情况下可以每个线程消费多个partition。但负载高的情况下,Consumer 线程数最好和Partition数量保持一致。如果还是消费不过来,应该再开 Consumer 进程,进程内线程数同样和分区数一致。
 
消费消息时,kafka client需指定topic以及partition number(每个partition对应一个逻辑日志流,如topic代表某个产品线,partition代表产品线的日志按天切分的结果),consumer client订阅后,就可迭代读取消息,如果没有消息,consumer client会阻塞直到有新的消息发布。consumer可以累积确认接收到的消息,当其确认了某个offset的消息,意味着之前的消息也都已成功接收到,此时broker会更新zookeeper上地offset registry。
 
5.3  高效的数据传输
1.  发布者每次可发布多条消息(将消息加到一个消息集合中发布), consumer每次迭代消费一条消息。

2.  不创建单独的cache,使用系统的page cache。发布者顺序发布,订阅者通常比发布者滞后一点点,直接使用Linuxpage cache效果也比较后,同时减少了cache管理及垃圾收集的开销。

3.  使用sendfile优化网络传输,减少一次内存拷贝。
 
6.Kafka 与 Zookeeper
 
6.1 Zookeeper 协调控制
1. 管理broker与consumer的动态加入与离开。(Producer不需要管理,随便一台计算机都可以作为Producer向Kakfa Broker发消息)
2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一
   个consumer group内的多个consumer的消费负载平衡。(因为一个comsumer消费一个或多个partition,一个partition只能被一个consumer消费)

3.  维护消费关系及每个partition的消费信息。

 

6.2 Zookeeper上的细节:

1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。

2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。

3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

Redis 持久化之RDB和AOF

转自:ITDragon龙的博客

Redis 有两种持久化方案,RDB (Redis DataBase)和 AOF (Append Only File)。如果你想快速了解和使用RDB和AOF,可以直接跳到文章底部看总结。本章节通过配置文件,触发快照的方式,恢复数据的操作,命令操作演示,优缺点来学习 Redis 的重点知识持久化

RDB 详解

RDB 是 Redis 默认的持久化方案。在指定的时间间隔内,执行指定次数的写操作,则会将内存中的数据写入到磁盘中。即在指定目录下生成一个dump.rdb文件。Redis 重启会通过加载dump.rdb文件恢复数据。

从配置文件了解RDB

打开 redis.conf 文件,找到 SNAPSHOTTING 对应内容
1 RDB核心规则配置(重点)

save <seconds> <changes>
# save ""
save 900 1
save 300 10
save 60 10000

解说:save <指定时间间隔> <执行指定次数更新操作>,满足条件就将内存中的数据同步到硬盘中。官方出厂配置默认是 900秒内有1个更改,300秒内有10个更改以及60秒内有10000个更改,则将内存中的数据快照写入磁盘。
若不想用RDB方案,可以把 save “” 的注释打开,下面三个注释。

2 指定本地数据库文件名,一般采用默认的 dump.rdb

dbfilename dump.rdb

3 指定本地数据库存放目录,一般也用默认配置

dir ./

4 默认开启数据压缩

rdbcompression yes

解说:配置存储至本地数据库时是否压缩数据,默认为yes。Redis采用LZF压缩方式,但占用了一点CPU的时间。若关闭该选项,但会导致数据库文件变的巨大。建议开启。

触发RDB快照

1 在指定的时间间隔内,执行指定次数的写操作
2 执行save(阻塞, 只管保存快照,其他的等待) 或者是bgsave (异步)命令
3 执行flushall 命令,清空数据库所有数据,意义不大。
4 执行shutdown 命令,保证服务器正常关闭且不丢失任何数据,意义…也不大。

通过RDB文件恢复数据

将dump.rdb 文件拷贝到redis的安装目录的bin目录下,重启redis服务即可。在实际开发中,一般会考虑到物理机硬盘损坏情况,选择备份dump.rdb 。可以从下面的操作演示中可以体会到。

RDB 的优缺点

优点:
1 适合大规模的数据恢复。
2 如果业务对数据完整性和一致性要求不高,RDB是很好的选择。

缺点:
1 数据的完整性和一致性不高,因为RDB可能在最后一次备份时宕机了。
2 备份时占用内存,因为Redis 在备份时会独立创建一个子进程,将数据写入到一个临时文件(此时内存中的数据是原来的两倍哦),最后再将临时文件替换之前的备份文件。
所以Redis 的持久化和数据的恢复要选择在夜深人静的时候执行是比较合理的。

操作演示

[root@itdragon bin]# vim redis.conf
save 900 1
save 120 5
save 60 10000
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> set key1 value1
OK
127.0.0.1:6379> set key2 value2
OK
127.0.0.1:6379> set key3 value3
OK
127.0.0.1:6379> set key4 value4
OK
127.0.0.1:6379> set key5 value5
OK
127.0.0.1:6379> set key6 value6
OK
127.0.0.1:6379> SHUTDOWN
not connected> QUIT
[root@itdragon bin]# cp dump.rdb dump_bk.rdb
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> FLUSHALL 
OK
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> SHUTDOWN
not connected> QUIT
[root@itdragon bin]# cp dump_bk.rdb  dump.rdb
cp: overwrite `dump.rdb'? y
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> keys *
1) "key5"
2) "key1"
3) "key3"
4) "key4"
5) "key6"
6) "key2"

第一步:vim 修改持久化配置时间,120秒内修改5次则持久化一次。
第二步:重启服务使配置生效。
第三步:分别set 5个key,过两分钟后,在bin的当前目录下会自动生产一个dump.rdb文件。(set key6 是为了验证shutdown有触发RDB快照的作用)
第四步:将当前的dump.rdb 备份一份(模拟线上工作)。
第五步:执行FLUSHALL命令清空数据库数据(模拟数据丢失)。
第六步:重启Redis服务,恢复数据…..咦????( ′◔ ‸◔`)。数据是空的????这是因为FLUSHALL也有触发RDB快照的功能。
第七步:将备份的 dump_bk.rdb 替换 dump.rdb 然后重新Redis。

注意点:SHUTDOWN 和 FLUSHALL 命令都会触发RDB快照,这是一个坑,请大家注意。

其他命令:

  • keys * 匹配数据库中所有 key
  • save 阻塞触发RDB快照,使其备份数据
  • FLUSHALL 清空整个 Redis 服务器的数据(几乎不用)
  • SHUTDOWN 关机走人(很少用)

AOF 详解

AOF :Redis 默认不开启。它的出现是为了弥补RDB的不足(数据的不一致性),所以它采用日志的形式来记录每个写操作,并追加到文件中。Redis 重启的会根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

从配置文件了解AOF

打开 redis.conf 文件,找到 APPEND ONLY MODE 对应内容
1 redis 默认关闭,开启需要手动把no改为yes

appendonly yes

2 指定本地数据库文件名,默认值为 appendonly.aof

appendfilename "appendonly.aof"

3 指定更新日志条件

# appendfsync always
appendfsync everysec
# appendfsync no

解说:
always:同步持久化,每次发生数据变化会立刻写入到磁盘中。性能较差当数据完整性比较好(慢,安全)
everysec:出厂默认推荐,每秒异步记录一次(默认值)
no:不同步

4 配置重写触发机制

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

解说:当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发。一般都设置为3G,64M太小了。

触发AOF快照

根据配置文件触发,可以是每次执行触发,可以是每秒触发,可以不同步。

根据AOF文件恢复数据

正常情况下,将appendonly.aof 文件拷贝到redis的安装目录的bin目录下,重启redis服务即可。但在实际开发中,可能因为某些原因导致appendonly.aof 文件格式异常,从而导致数据还原失败,可以通过命令redis-check-aof –fix appendonly.aof 进行修复 。从下面的操作演示中体会。

AOF的重写机制

前面也说到了,AOF的工作原理是将写操作追加到文件中,文件的冗余内容会越来越多。所以聪明的 Redis 新增了重写机制。当AOF文件的大小超过所设定的阈值时,Redis就会对AOF文件的内容压缩。

重写的原理:Redis 会fork出一条新进程,读取内存中的数据,并重新写到一个临时文件中。并没有读取旧文件(你都那么大了,我还去读你??? o(゚Д゚)っ傻啊!)。最后替换旧的aof文件。

触发机制:当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发。这里的“一倍”和“64M” 可以通过配置文件修改。

AOF 的优缺点

优点:数据的完整性和一致性更高
缺点:因为AOF记录的内容多,文件会越来越大,数据恢复也会越来越慢。

操作演示

[root@itdragon bin]# vim appendonly.aof
appendonly yes
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> set keyAOf valueAof
OK
127.0.0.1:6379> FLUSHALL 
OK
127.0.0.1:6379> SHUTDOWN
not connected> QUIT
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> keys *
1) "keyAOf"
127.0.0.1:6379> SHUTDOWN
not connected> QUIT
[root@itdragon bin]# vim appendonly.aof
fjewofjwojfoewifjowejfwf
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
Could not connect to Redis at 127.0.0.1:6379: Connection refused
not connected> QUIT
[root@itdragon bin]# redis-check-aof --fix appendonly.aof 
'x              3e: Expected prefix '*', got: '
AOF analyzed: size=92, ok_up_to=62, diff=30
This will shrink the AOF from 92 bytes, with 30 bytes, to 62 bytes
Continue? [y/N]: y
Successfully truncated AOF
[root@itdragon bin]# ./redis-server redis.conf
[root@itdragon bin]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> keys *
1) "keyAOf"

第一步:修改配置文件,开启AOF持久化配置。
第二步:重启Redis服务,并进入Redis 自带的客户端中。
第三步:保存值,然后模拟数据丢失,关闭Redis服务。
第四步:重启服务,发现数据恢复了。(额外提一点:有教程显示FLUSHALL 命令会被写入AOF文件中,导致数据恢复失败。我安装的是redis-4.0.2没有遇到这个问题)。
第五步:修改appendonly.aof,模拟文件异常情况。
第六步:重启 Redis 服务失败。这同时也说明了,RDB和AOF可以同时存在,且优先加载AOF文件。
第七步:校验appendonly.aof 文件。重启Redis 服务后正常。

补充点:aof 的校验是通过 redis-check-aof 文件,那么rdb 的校验是不是可以通过 redis-check-rdb 文件呢???

总结

  1. Redis 默认开启RDB持久化方式,在指定的时间间隔内,执行指定次数的写操作,则将内存中的数据写入到磁盘中。
  2. RDB 持久化适合大规模的数据恢复但它的数据一致性和完整性较差。
  3. Redis 需要手动开启AOF持久化方式,默认是每秒将写操作日志追加到AOF文件中。
  4. AOF 的数据完整性比RDB高,但记录内容多了,会影响数据恢复的效率。
  5. Redis 针对 AOF文件大的问题,提供重写的瘦身机制。
  6. 若只打算用Redis 做缓存,可以关闭持久化。
  7. 若打算使用Redis 的持久化。建议RDB和AOF都开启。其实RDB更适合做数据的备份,留一后手。AOF出问题了,还有RDB。

Java 8系列之重新认识HashMap

转自:美团技术博客

HashMap是Java程序员使用频率最高的用于映射(键值对)处理的数据类型。随着JDK(Java Developmet Kit)版本的更新,JDK1.8对HashMap底层的实现进行了优化,例如引入红黑树的数据结构和扩容的优化等。本文结合JDK1.7和JDK1.8的区别,深入探讨HashMap的结构实现和功能原理。

Java为数据结构中的映射定义了一个接口java.util.Map,此接口主要有四个常用的实现类,分别是HashMap、Hashtable、LinkedHashMap和TreeMap,类继承关系如下图所示:

下面针对各个实现类的特点做一些说明:

(1) HashMap:它根据键的hashCode值存储数据,大多数情况下可以直接定位到它的值,因而具有很快的访问速度,但遍历顺序却是不确定的。 HashMap最多只允许一条记录的键为null,允许多条记录的值为null。HashMap非线程安全,即任一时刻可以有多个线程同时写HashMap,可能会导致数据的不一致。如果需要满足线程安全,可以用 Collections的synchronizedMap方法使HashMap具有线程安全的能力,或者使用ConcurrentHashMap。

(2) Hashtable:Hashtable是遗留类,很多映射的常用功能与HashMap类似,不同的是它承自Dictionary类,并且是线程安全的,任一时间只有一个线程能写Hashtable,并发性不如ConcurrentHashMap,因为ConcurrentHashMap引入了分段锁。Hashtable不建议在新代码中使用,不需要线程安全的场合可以用HashMap替换,需要线程安全的场合可以用ConcurrentHashMap替换。

(3) LinkedHashMap:LinkedHashMap是HashMap的一个子类,保存了记录的插入顺序,在用Iterator遍历LinkedHashMap时,先得到的记录肯定是先插入的,也可以在构造时带参数,按照访问次序排序。

(4) TreeMap:TreeMap实现SortedMap接口,能够把它保存的记录根据键排序,默认是按键值的升序排序,也可以指定排序的比较器,当用Iterator遍历TreeMap时,得到的记录是排过序的。如果使用排序的映射,建议使用TreeMap。在使用TreeMap时,key必须实现Comparable接口或者在构造TreeMap传入自定义的Comparator,否则会在运行时抛出java.lang.ClassCastException类型的异常。

对于上述四种Map类型的类,要求映射中的key是不可变对象。不可变对象是该对象在创建后它的哈希值不会被改变。如果对象的哈希值发生变化,Map对象很可能就定位不到映射的位置了。

通过上面的比较,我们知道了HashMap是Java的Map家族中一个普通成员,鉴于它可以满足大多数场景的使用条件,所以是使用频度最高的一个。下文我们主要结合源码,从存储结构、常用方法分析、扩容以及安全性等方面深入讲解HashMap的工作原理。

搞清楚HashMap,首先需要知道HashMap是什么,即它的存储结构-字段;其次弄明白它能干什么,即它的功能实现-方法。下面我们针对这两个方面详细展开讲解。

存储结构-字段

从结构实现来讲,HashMap是数组+链表+红黑树(JDK1.8增加了红黑树部分)实现的,如下如所示。

这里需要讲明白两个问题:数据底层具体存储的是什么?这样的存储方式有什么优点呢?

(1) 从源码可知,HashMap类中有一个非常重要的字段,就是 Node[] table,即哈希桶数组,明显它是一个Node的数组。我们来看Node[JDK1.8]是何物。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;    //用来定位数组索引位置
        final K key;
        V value;
        Node<K,V> next;   //链表的下一个node

        Node(int hash, K key, V value, Node<K,V> next) { ... }
        public final K getKey(){ ... }
        public final V getValue() { ... }
        public final String toString() { ... }
        public final int hashCode() { ... }
        public final V setValue(V newValue) { ... }
        public final boolean equals(Object o) { ... }
}

Node是HashMap的一个内部类,实现了Map.Entry接口,本质是就是一个映射(键值对)。上图中的每个黑色圆点就是一个Node对象。

(2) HashMap就是使用哈希表来存储的。哈希表为解决冲突,可以采用开放地址法和链地址法等来解决问题,Java中HashMap采用了链地址法。链地址法,简单来说,就是数组加链表的结合。在每个数组元素上都一个链表结构,当数据被Hash后,得到数组下标,把数据放在对应下标元素的链表上。例如程序执行下面代码:

    map.put("美团","小美");

系统将调用”美团”这个key的hashCode()方法得到其hashCode 值(该方法适用于每个Java对象),然后再通过Hash算法的后两步运算(高位运算和取模运算,下文有介绍)来定位该键值对的存储位置,有时两个key会定位到相同的位置,表示发生了Hash碰撞。当然Hash算法计算结果越分散均匀,Hash碰撞的概率就越小,map的存取效率就会越高。

如果哈希桶数组很大,即使较差的Hash算法也会比较分散,如果哈希桶数组数组很小,即使好的Hash算法也会出现较多碰撞,所以就需要在空间成本和时间成本之间权衡,其实就是在根据实际情况确定哈希桶数组的大小,并在此基础上设计好的hash算法减少Hash碰撞。那么通过什么方式来控制map使得Hash碰撞的概率又小,哈希桶数组(Node[] table)占用空间又少呢?答案就是好的Hash算法和扩容机制。

在理解Hash和扩容流程之前,我们得先了解下HashMap的几个字段。从HashMap的默认构造函数源码可知,构造函数就是对下面几个字段进行初始化,源码如下:

     int threshold;             // 所能容纳的key-value对极限 
     final float loadFactor;    // 负载因子
     int modCount;  
     int size;  

首先,Node[] table的初始化长度length(默认值是16),Load factor为负载因子(默认值是0.75),threshold是HashMap所能容纳的最大数据量的Node(键值对)个数。threshold = length * Load factor。也就是说,在数组定义好长度之后,负载因子越大,所能容纳的键值对个数越多。

结合负载因子的定义公式可知,threshold就是在此Load factor和length(数组长度)对应下允许的最大元素数目,超过这个数目就重新resize(扩容),扩容后的HashMap容量是之前容量的两倍。默认的负载因子0.75是对空间和时间效率的一个平衡选择,建议大家不要修改,除非在时间和空间比较特殊的情况下,如果内存空间很多而又对时间效率要求很高,可以降低负载因子Load factor的值;相反,如果内存空间紧张而对时间效率要求不高,可以增加负载因子loadFactor的值,这个值可以大于1。

size这个字段其实很好理解,就是HashMap中实际存在的键值对数量。注意和table的长度length、容纳最大键值对数量threshold的区别。而modCount字段主要用来记录HashMap内部结构发生变化的次数,主要用于迭代的快速失败。强调一点,内部结构发生变化指的是结构发生变化,例如put新键值对,但是某个key对应的value值被覆盖不属于结构变化。

在HashMap中,哈希桶数组table的长度length大小必须为2的n次方(一定是合数),这是一种非常规的设计,常规的设计是把桶的大小设计为素数。相对来说素数导致冲突的概率要小于合数,具体证明可以参考http://blog.csdn.net/liuqiyao_01/article/details/14475159,Hashtable初始化桶大小为11,就是桶大小设计为素数的应用(Hashtable扩容后不能保证还是素数)。HashMap采用这种非常规设计,主要是为了在取模和扩容时做优化,同时为了减少冲突,HashMap定位哈希桶索引位置时,也加入了高位参与运算的过程。

这里存在一个问题,即使负载因子和Hash算法设计的再合理,也免不了会出现拉链过长的情况,一旦出现拉链过长,则会严重影响HashMap的性能。于是,在JDK1.8版本中,对数据结构做了进一步的优化,引入了红黑树。而当链表长度太长(默认超过8)时,链表就转换为红黑树,利用红黑树快速增删改查的特点提高HashMap的性能,其中会用到红黑树的插入、删除、查找等算法。本文不再对红黑树展开讨论,想了解更多红黑树数据结构的工作原理可以参考http://blog.csdn.net/v_july_v/article/details/6105630

功能实现-方法

HashMap的内部功能实现很多,本文主要从根据key获取哈希桶数组索引位置、put方法的详细执行、扩容过程三个具有代表性的点深入展开讲解。

1. 确定哈希桶数组索引位置

不管增加、删除、查找键值对,定位到哈希桶数组的位置都是很关键的第一步。前面说过HashMap的数据结构是数组和链表的结合,所以我们当然希望这个HashMap里面的元素位置尽量分布均匀些,尽量使得每个位置上的元素数量只有一个,那么当我们用hash算法求得这个位置的时候,马上就可以知道对应位置的元素就是我们要的,不用遍历链表,大大优化了查询的效率。HashMap定位数组索引位置,直接决定了hash方法的离散性能。先看看源码的实现(方法一+方法二):

方法一:
static final int hash(Object key) {   //jdk1.8 & jdk1.7
     int h;
     // h = key.hashCode() 为第一步 取hashCode值
     // h ^ (h >>> 16)  为第二步 高位参与运算
     return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
方法二:
static int indexFor(int h, int length) {  //jdk1.7的源码,jdk1.8没有这个方法,但是实现原理一样的
     return h & (length-1);  //第三步 取模运算
}

这里的Hash算法本质上就是三步:取key的hashCode值、高位运算、取模运算

对于任意给定的对象,只要它的hashCode()返回值相同,那么程序调用方法一所计算得到的Hash码值总是相同的。我们首先想到的就是把hash值对数组长度取模运算,这样一来,元素的分布相对来说是比较均匀的。但是,模运算的消耗还是比较大的,在HashMap中是这样做的:调用方法二来计算该对象应该保存在table数组的哪个索引处。

这个方法非常巧妙,它通过h & (table.length -1)来得到该对象的保存位,而HashMap底层数组的长度总是2的n次方,这是HashMap在速度上的优化。当length总是2的n次方时,h& (length-1)运算等价于对length取模,也就是h%length,但是&比%具有更高的效率。

在JDK1.8的实现中,优化了高位运算的算法,通过hashCode()的高16位异或低16位实现的:(h = k.hashCode()) ^ (h >>> 16),主要是从速度、功效、质量来考虑的,这么做可以在数组table的length比较小的时候,也能保证考虑到高低Bit都参与到Hash的计算中,同时不会有太大的开销。

下面举例说明下,n为table的长度。

2. 分析HashMap的put方法

HashMap的put方法执行过程可以通过下图来理解,自己有兴趣可以去对比源码更清楚地研究学习。

>

①.判断键值对数组table[i]是否为空或为null,否则执行resize()进行扩容;

②.根据键值key计算hash值得到插入的数组索引i,如果table[i]==null,直接新建节点添加,转向⑥,如果table[i]不为空,转向③;

③.判断table[i]的首个元素是否和key一样,如果相同直接覆盖value,否则转向④,这里的相同指的是hashCode以及equals;

④.判断table[i] 是否为treeNode,即table[i] 是否是红黑树,如果是红黑树,则直接在树中插入键值对,否则转向⑤;

⑤.遍历table[i],判断链表长度是否大于8,大于8的话把链表转换为红黑树,在红黑树中执行插入操作,否则进行链表的插入操作;遍历过程中若发现key已经存在直接覆盖value即可;

⑥.插入成功后,判断实际存在的键值对数量size是否超多了最大容量threshold,如果超过,进行扩容。

JDK1.8HashMap的put方法源码如下:

 1 public V put(K key, V value) {
 2     // 对key的hashCode()做hash
 3     return putVal(hash(key), key, value, false, true);
 4 }
 5 
 6 final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
 7                boolean evict) {
 8     Node<K,V>[] tab; Node<K,V> p; int n, i;
 9     // 步骤①:tab为空则创建
10     if ((tab = table) == null || (n = tab.length) == 0)
11         n = (tab = resize()).length;
12     // 步骤②:计算index,并对null做处理 
13     if ((p = tab[i = (n - 1) & hash]) == null) 
14         tab[i] = newNode(hash, key, value, null);
15     else {
16         Node<K,V> e; K k;
17         // 步骤③:节点key存在,直接覆盖value
18         if (p.hash == hash &&
19             ((k = p.key) == key || (key != null && key.equals(k))))
20             e = p;
21         // 步骤④:判断该链为红黑树
22         else if (p instanceof TreeNode)
23             e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
24         // 步骤⑤:该链为链表
25         else {
26             for (int binCount = 0; ; ++binCount) {
27                 if ((e = p.next) == null) {
28                     p.next = newNode(hash, key,value,null);
                        //链表长度大于8转换为红黑树进行处理
29                     if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st  
30                         treeifyBin(tab, hash);
31                     break;
32                 }
                    // key已经存在直接覆盖value
33                 if (e.hash == hash &&
34                     ((k = e.key) == key || (key != null && key.equals(k)))) 
35							break;
36                 p = e;
37             }
38         }
39         
40         if (e != null) { // existing mapping for key
41             V oldValue = e.value;
42             if (!onlyIfAbsent || oldValue == null)
43                 e.value = value;
44             afterNodeAccess(e);
45             return oldValue;
46         }
47     }

48     ++modCount;
49     // 步骤⑥:超过最大容量 就扩容
50     if (++size > threshold)
51         resize();
52     afterNodeInsertion(evict);
53     return null;
54 }

3. 扩容机制

扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素。当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶。

我们分析下resize的源码,鉴于JDK1.8融入了红黑树,较复杂,为了便于理解我们仍然使用JDK1.7的代码,好理解一些,本质上区别不大,具体区别后文再说。

 1 void resize(int newCapacity) {   //传入新的容量
 2     Entry[] oldTable = table;    //引用扩容前的Entry数组
 3     int oldCapacity = oldTable.length;         
 4     if (oldCapacity == MAXIMUM_CAPACITY) {  //扩容前的数组大小如果已经达到最大(2^30)了
 5         threshold = Integer.MAX_VALUE; //修改阈值为int的最大值(2^31-1),这样以后就不会扩容了
 6         return;
 7     }
 8  
 9     Entry[] newTable = new Entry[newCapacity];  //初始化一个新的Entry数组
10     transfer(newTable);                         //!!将数据转移到新的Entry数组里
11     table = newTable;                           //HashMap的table属性引用新的Entry数组
12     threshold = (int)(newCapacity * loadFactor);//修改阈值
13 }

这里就是使用一个容量更大的数组来代替已有的容量小的数组,transfer()方法将原有Entry数组的元素拷贝到新的Entry数组里。

 1 void transfer(Entry[] newTable) {
 2     Entry[] src = table;                   //src引用了旧的Entry数组
 3     int newCapacity = newTable.length;
 4     for (int j = 0; j < src.length; j++) { //遍历旧的Entry数组
 5         Entry<K,V> e = src[j];             //取得旧Entry数组的每个元素
 6         if (e != null) {
 7             src[j] = null;//释放旧Entry数组的对象引用(for循环后,旧的Entry数组不再引用任何对象)
 8             do {
 9                 Entry<K,V> next = e.next;
10                 int i = indexFor(e.hash, newCapacity); //!!重新计算每个元素在数组中的位置
11                 e.next = newTable[i]; //标记[1]
12                 newTable[i] = e;      //将元素放在数组上
13                 e = next;             //访问下一个Entry链上的元素
14             } while (e != null);
15         }
16     }
17 } 

newTable[i]的引用赋给了e.next,也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置;这样先放在一个索引上的元素终会被放到Entry链的尾部(如果发生了hash冲突的话),这一点和Jdk1.8有区别,下文详解。在旧数组中同一条Entry链上的元素,通过重新计算索引位置后,有可能被放到了新数组的不同位置上。

下面举个例子说明下扩容过程。假设了我们的hash算法就是简单的用key mod 一下表的大小(也就是数组的长度)。其中的哈希桶数组table的size=2, 所以key = 3、7、5,put顺序依次为 5、7、3。在mod 2以后都冲突在table[1]这里了。这里假设负载因子 loadFactor=1,即当键值对的实际大小size 大于 table的实际大小时进行扩容。接下来的三个步骤是哈希桶数组 resize成4,然后所有的Node重新rehash的过程。

下面我们讲解下JDK1.8做了哪些优化。经过观测可以发现,我们使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。看下图可以明白这句话的意思,n为table的长度,图(a)表示扩容前的key1和key2两种key确定索引位置的示例,图(b)表示扩容后key1和key2两种key确定索引位置的示例,其中hash1是key1对应的哈希与高位运算结果。

元素在重新计算hash之后,因为n变为2倍,那么n-1的mask范围在高位多1bit(红色),因此新的index就会发生这样的变化:

因此,我们在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”,可以看看下图为16扩充为32的resize示意图:

这个设计确实非常的巧妙,既省去了重新计算hash值的时间,而且同时,由于新增的1bit是0还是1可以认为是随机的,因此resize的过程,均匀的把之前的冲突的节点分散到新的bucket了。这一块就是JDK1.8新增的优化点。有一点注意区别,JDK1.7中rehash的时候,旧链表迁移新链表的时候,如果在新表的数组索引位置相同,则链表元素会倒置,但是从上图可以看出,JDK1.8不会倒置。有兴趣的同学可以研究下JDK1.8的resize源码,写的很赞,如下:

 1 final Node<K,V>[] resize() {
 2     Node<K,V>[] oldTab = table;
 3     int oldCap = (oldTab == null) ? 0 : oldTab.length;
 4     int oldThr = threshold;
 5     int newCap, newThr = 0;
 6     if (oldCap > 0) {
 7         // 超过最大值就不再扩充了,就只好随你碰撞去吧
 8         if (oldCap >= MAXIMUM_CAPACITY) {
 9             threshold = Integer.MAX_VALUE;
10             return oldTab;
11         }
12         // 没超过最大值,就扩充为原来的2倍
13         else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
14                  oldCap >= DEFAULT_INITIAL_CAPACITY)
15             newThr = oldThr << 1; // double threshold
16     }
17     else if (oldThr > 0) // initial capacity was placed in threshold
18         newCap = oldThr;
19     else {               // zero initial threshold signifies using defaults
20         newCap = DEFAULT_INITIAL_CAPACITY;
21         newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
22     }
23     // 计算新的resize上限
24     if (newThr == 0) {
25 
26         float ft = (float)newCap * loadFactor;
27         newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
28                   (int)ft : Integer.MAX_VALUE);
29     }
30     threshold = newThr;
31     @SuppressWarnings({"rawtypes","unchecked"})
32         Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
33     table = newTab;
34     if (oldTab != null) {
35         // 把每个bucket都移动到新的buckets中
36         for (int j = 0; j < oldCap; ++j) {
37             Node<K,V> e;
38             if ((e = oldTab[j]) != null) {
39                 oldTab[j] = null;
40                 if (e.next == null)
41                     newTab[e.hash & (newCap - 1)] = e;
42                 else if (e instanceof TreeNode)
43                     ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
44                 else { // 链表优化重hash的代码块
45                     Node<K,V> loHead = null, loTail = null;
46                     Node<K,V> hiHead = null, hiTail = null;
47                     Node<K,V> next;
48                     do {
49                         next = e.next;
50                         // 原索引
51                         if ((e.hash & oldCap) == 0) {
52                             if (loTail == null)
53                                 loHead = e;
54                             else
55                                 loTail.next = e;
56                             loTail = e;
57                         }
58                         // 原索引+oldCap
59                         else {
60                             if (hiTail == null)
61                                 hiHead = e;
62                             else
63                                 hiTail.next = e;
64                             hiTail = e;
65                         }
66                     } while ((e = next) != null);
67                     // 原索引放到bucket里
68                     if (loTail != null) {
69                         loTail.next = null;
70                         newTab[j] = loHead;
71                     }
72                     // 原索引+oldCap放到bucket里
73                     if (hiTail != null) {
74                         hiTail.next = null;
75                         newTab[j + oldCap] = hiHead;
76                     }
77                 }
78             }
79         }
80     }
81     return newTab;
82 }

在多线程使用场景中,应该尽量避免使用线程不安全的HashMap,而使用线程安全的ConcurrentHashMap。那么为什么说HashMap是线程不安全的,下面举例子说明在并发的多线程使用场景中使用HashMap可能造成死循环。代码例子如下(便于理解,仍然使用JDK1.7的环境):

public class HashMapInfiniteLoop {  

    private static HashMap<Integer,String> map = new HashMap<Integer,String>(20.75f);  
    public static void main(String[] args) {  
        map.put(5"C");  

        new Thread("Thread1") {  
            public void run() {  
                map.put(7, "B");  
                System.out.println(map);  
            };  
        }.start();  
        new Thread("Thread2") {  
            public void run() {  
                map.put(3, "A);  
                System.out.println(map);  
            };  
        }.start();        
    }  
} 

其中,map初始化为一个长度为2的数组,loadFactor=0.75,threshold=2*0.75=1,也就是说当put第二个key的时候,map就需要进行resize。

通过设置断点让线程1和线程2同时debug到transfer方法(3.3小节代码块)的首行。注意此时两个线程已经成功添加数据。放开thread1的断点至transfer方法的“Entry next = e.next;” 这一行;然后放开线程2的的断点,让线程2进行resize。结果如下图。

注意,Thread1的 e 指向了key(3),而next指向了key(7),其在线程二rehash后,指向了线程二重组后的链表。

线程一被调度回来执行,先是执行 newTalbe[i] = e, 然后是e = next,导致了e指向了key(7),而下一次循环的next = e.next导致了next指向了key(3)。

e.next = newTable[i] 导致 key(3).next 指向了 key(7)。注意:此时的key(7).next 已经指向了key(3), 环形链表就这样出现了。

于是,当我们用线程一调用map.get(11)时,悲剧就出现了——Infinite Loop。

HashMap中,如果key经过hash算法得出的数组索引位置全部不相同,即Hash算法非常好,那样的话,getKey方法的时间复杂度就是O(1),如果Hash算法技术的结果碰撞非常多,假如Hash算极其差,所有的Hash算法结果得出的索引位置一样,那样所有的键值对都集中到一个桶中,或者在一个链表中,或者在一个红黑树中,时间复杂度分别为O(n)和O(lgn)。 鉴于JDK1.8做了多方面的优化,总体性能优于JDK1.7,下面我们从两个方面用例子证明这一点。

Hash较均匀的情况

为了便于测试,我们先写一个类Key,如下:

class Key implements Comparable<Key> {

    private final int value;

    Key(int value) {
        this.value = value;
    }

    @Override
    public int compareTo(Key o) {
        return Integer.compare(this.value, o.value);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Key key = (Key) o;
        return value == key.value;
    }

    @Override
    public int hashCode() {
        return value;
    }
}

这个类复写了equals方法,并且提供了相当好的hashCode函数,任何一个值的hashCode都不会相同,因为直接使用value当做hashcode。为了避免频繁的GC,我将不变的Key实例缓存了起来,而不是一遍一遍的创建它们。代码如下:

public class Keys {

    public static final int MAX_KEY = 10_000_000;
    private static final Key[] KEYS_CACHE = new Key[MAX_KEY];

    static {
        for (int i = 0; i < MAX_KEY; ++i) {
            KEYS_CACHE[i] = new Key(i);
        }
    }

    public static Key of(int value) {
        return KEYS_CACHE[value];
    }
}

现在开始我们的试验,测试需要做的仅仅是,创建不同size的HashMap(1、10、100、……10000000),屏蔽了扩容的情况,代码如下:

   static void test(int mapSize) {

        HashMap<Key, Integer> map = new HashMap<Key,Integer>(mapSize);
        for (int i = 0; i < mapSize; ++i) {
            map.put(Keys.of(i), i);
        }

        long beginTime = System.nanoTime(); //获取纳秒
        for (int i = 0; i < mapSize; i++) {
            map.get(Keys.of(i));
        }
        long endTime = System.nanoTime();
        System.out.println(endTime - beginTime);
    }

    public static void main(String[] args) {
        for(int i=10;i<= 1000 0000;i*= 10){
            test(i);
        }
    }


在测试中会查找不同的值,然后度量花费的时间,为了计算getKey的平均时间,我们遍历所有的get方法,计算总的时间,除以key的数量,计算一个平均值,主要用来比较,绝对值可能会受很多环境因素的影响。结果如下:

通过观测测试结果可知,JDK1.8的性能要高于JDK1.7 15%以上,在某些size的区域上,甚至高于100%。由于Hash算法较均匀,JDK1.8引入的红黑树效果不明显,下面我们看看Hash不均匀的的情况。

Hash极不均匀的情况

假设我们又一个非常差的Key,它们所有的实例都返回相同的hashCode值。这是使用HashMap最坏的情况。代码修改如下:

class Key implements Comparable<Key> {

    //...

    @Override
    public int hashCode() {
        return 1;
    }
}

仍然执行main方法,得出的结果如下表所示:

从表中结果中可知,随着size的变大,JDK1.7的花费时间是增长的趋势,而JDK1.8是明显的降低趋势,并且呈现对数增长稳定。当一个链表太长的时候,HashMap会动态的将它替换成一个红黑树,这话的话会将时间复杂度从O(n)降为O(logn)。hash算法均匀和不均匀所花费的时间明显也不相同,这两种情况的相对比较,可以说明一个好的hash算法的重要性。

      测试环境:处理器为2.2 GHz Intel Core i7,内存为16 GB 1600 MHz DDR3,SSD硬盘,使用默认的JVM参数,运行在64位的OS X 10.10.1上。

(1) 扩容是一个特别耗性能的操作,所以当程序员在使用HashMap的时候,估算map的大小,初始化的时候给一个大致的数值,避免map进行频繁的扩容。

(2) 负载因子是可以修改的,也可以大于1,但是建议不要轻易修改,除非情况非常特殊。

(3) HashMap是线程不安全的,不要在并发的环境中同时操作HashMap,建议使用ConcurrentHashMap。

(4) JDK1.8引入红黑树大程度优化了HashMap的性能。

(5) 还没升级JDK1.8的,现在开始升级吧。HashMap的性能提升仅仅是JDK1.8的冰山一角。

  1. JDK1.7&JDK1.8 源码。
  2. CSDN博客频道,HashMap多线程死循环问题,2014。
  3. 红黑联盟,Java类集框架之HashMap(JDK1.8)源码剖析,2015。
  4. CSDN博客频道, 教你初步了解红黑树,2010。
  5. Java Code Geeks,HashMap performance improvements in Java 8,2014。
  6. Importnew,危险!在HashMap中将可变对象用作Key,2014。
  7. CSDN博客频道,为什么一般hashtable的桶数会取一个素数,2013。

JAVA NIO基本使用

NIO中有几个核心对象:缓冲区(Buffer)、通道(Channel)、选择器(Selector)

缓冲区Buffer

缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组,在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据,都是将它放到缓冲区中。而在面向流I/O系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。

import java.nio.IntBuffer;
 
public class TestIntBuffer {
	public static void main(String[] args) {
		// 分配新的int缓冲区,参数为缓冲区容量
		// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组,其数组偏移量将为零。
		IntBuffer buffer = IntBuffer.allocate(8);
 
		for (int i = 0; i < buffer.capacity(); ++i) {
			int j = 2 * (i + 1);
			// 将给定整数写入此缓冲区的当前位置,当前位置递增
			buffer.put(j);
		}
 
		// 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
		buffer.flip();
 
		// 查看在当前位置和限制位置之间是否有元素
		while (buffer.hasRemaining()) {
			// 读取此缓冲区当前位置的整数,然后当前位置递增
			int j = buffer.get();
			System.out.print(j + "  ");
		}
 
	}
 
}

通道Channel

通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过Buffer对象来处理。我们永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

使用NIO读取数据

在前面我们说过,任何时候读取数据,都不是直接从通道读取,而是从通道读取到缓冲区。所以使用NIO读取数据可以分为下面三个步骤: 
1. 从FileInputStream获取Channel 
2. 创建Buffer 
3. 将数据从Channel读取到Buffer中

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
 
public class Program {
    static public void main( String args[] ) throws Exception {
        FileInputStream fin = new FileInputStream("c:\\test.txt");
        
        // 获取通道
        FileChannel fc = fin.getChannel();
        
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        
        // 读取数据到缓冲区
        fc.read(buffer);
        
        buffer.flip();
        
        while (buffer.remaining()>0) {
            byte b = buffer.get();
            System.out.print(((char)b));
        }
        
        fin.close();
    }
}

使用NIO写入数据

使用NIO写入数据与读取数据的过程类似,同样数据不是直接写入通道,而是写入缓冲区,可以分为下面三个步骤: 
1. 从FileInputStream获取Channel 
2. 创建Buffer 
3. 将数据从Channel写入到Buffer中

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
 
public class Program {
    static private final byte message[] = { 83, 111, 109, 101, 32,
        98, 121, 116, 101, 115, 46 };
 
    static public void main( String args[] ) throws Exception {
        FileOutputStream fout = new FileOutputStream( "c:\\test.txt" );
        
        FileChannel fc = fout.getChannel();
        
        ByteBuffer buffer = ByteBuffer.allocate( 1024 );
        
        for (int i=0; i<message.length; ++i) {
            buffer.put( message[i] );
        }
        
        buffer.flip();
        
        fc.write( buffer );
        
        fout.close();
    }

选择器Selector注册和处理事件

NIO中实现非阻塞I/O的核心对象就是Selector,Selector就是注册各种I/O事件地 方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件. 当有读或写等任何注册的事件发生时,可以从Selector中获得相应的SelectionKey,同时从 SelectionKey中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据

使用NIO中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:

1. 向Selector对象注册感兴趣的事件 
2. 从Selector中获取感兴趣的事件 
3. 根据不同的事件进行相应的处理

/*
 * 注册事件
 * */
protected Selector getSelector() throws IOException {
    // 创建Selector对象
    Selector sel = Selector.open();
    
    // 创建可选择通道,并配置为非阻塞模式
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    
    // 绑定通道到指定端口
    ServerSocket socket = server.socket();
    InetSocketAddress address = new InetSocketAddress(port);
    socket.bind(address);
    
    // 向Selector中注册感兴趣的事件
    server.register(sel, SelectionKey.OP_ACCEPT); 
    return sel;
}

/*
 * 开始监听
 * */ 
public void listen() { 
    System.out.println("listen on " + port);
    try { 
        while(true) { 
            // 该调用会阻塞,直到至少有一个事件发生
            selector.select(); 
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) { 
                SelectionKey key = (SelectionKey) iter.next(); 
                iter.remove(); 
                process(key); 
            } 
        } 
    } catch (IOException e) { 
        e.printStackTrace();
    } 
}

/*
 * 根据不同的事件做处理
 * */
protected void process(SelectionKey key) throws IOException{
    // 接收请求
    if (key.isAcceptable()) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel = server.accept();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ);
    }
    // 读信息
    else if (key.isReadable()) {
        SocketChannel channel = (SocketChannel) key.channel(); 
        int count = channel.read(buffer); 
        if (count > 0) { 
            buffer.flip(); 
            CharBuffer charBuffer = decoder.decode(buffer); 
            name = charBuffer.toString(); 
            SelectionKey sKey = channel.register(selector, SelectionKey.OP_WRITE); 
            sKey.attach(name); 
        } else { 
            channel.close(); 
        } 
        buffer.clear(); 
    }
    // 写事件
    else if (key.isWritable()) {
        SocketChannel channel = (SocketChannel) key.channel(); 
        String name = (String) key.attachment(); 
        
        ByteBuffer block = encoder.encode(CharBuffer.wrap("Hello " + name)); 
        if(block != null)
        {
            channel.write(block);
        }
        else
        {
            channel.close();
        }
 
     }
}

JAVA NIO的selector的实现原理 epoll

Java NIO的核心类库多路复用器Selector就是基于epoll的多路复用技术实现的 

相比select、poll系统调用,epoll有如下优点:

1.支持一个进程打开的socket描述符(FD)不受限制,仅受限于操作系统的最大文件句柄数。
select最大的缺陷是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。可以选择修改这个宏后重新编译内核,但这对带来网络效率的下降。也可以选择多进程的方案(传统的Apache方案)来解决这个问题,但进程的创建成本,以及进程间的数据交换需要进行考虑。
epoll具体支持的FD上线值可以通过cat /proc/sys/fs/file-max查看,这个值和系统的内存关系比较大。

2.I/O效率不会随着FD数目的增加而线性下降。
当拥有一个很大的socket集合时,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但select/poll每次调用都会线性扫描全部的集合,导致效率线性下降。而epoll只会对活跃的socket进行操作-只有活跃的socket才会主动调用callback函数,所以只需要遍历那些被内核I/O事件异步唤醒而加入Ready队列的FD集合。

3.使用mmap加速内核与用户空间的消息传递
epoll通过内核和用户空间mmap同一块内存来实现的。mmap()系统调用允许存放在块设备上的文件或信息的一部分映射到进程的部分地址空间。

看到这里,或许会产生一个疑问,为什么内核要和用户空间进行消息传递呢?

下面我来进行深入解析:

首先,操作系统必须完成的两个主要目标:

1,与硬件部分交互

2,为运行在计算机系统上的应用程序(即所谓的用户程序)提供执行环境。

而现代操作系统是禁止用户程序直接与底层硬件进行交互的,或者禁止直接访问任意的物理地址。为此,硬件为CPU引入了至少两种不同的执行模式:用户程序的非特权模式和内核的特权模式。Unix把它们分别称为用户态(User Mode)和内核态(Kernel Mode)。所以,每个实际的文件I/O操作必须在内核态下进行。

cpu既可以运行在用户态,也可以运行在内核态。一些CPU可以有两种以上的执行状态,如Intel80x86微处理器有四种不同的执行状态。但是,所有标准的Unix内核都仅仅利用了内核态和用户态。

一个程序执行时,大部分时间都处在用户态,只有需要内核所提供的服务时才切换到内核态。当内核满足了用户程序的请求后,它让程序又回到用户态下。

然后,访问文件的模式有多种:

1,规范模式

规范模式下文件打开后,标志O_SYNC与O_DIRECT清0,而且它的内容是由系统调用read()和write()来存取。系统调用read()将阻塞调用进程,直到数据被拷贝进用户态地址空间。但系统调用write()不同,它在数据被拷贝到页高速缓存(延迟写)后就马上结束。

2,同步模式

同步模式下文件打开后,标志O_SYNC置1。这个标志只影响写操作(读操作总是会阻塞),它将阻塞调用进程,直到数据被有效的写入磁盘。

3,内存映射模式

内存映射模式下文件打开后,应用程序发出系统调用mmap()将文件映射到内存中。因此,文件就称为RAM中的一个字节数组,应用程序就可以直接访问数组元素,而不需用系统调用read()、write()或lseek()。

4,直接I/O模式

直接I/O模式下文件打开后,标志O_DIRECT置1。任何读写操作都将数据在用户态地址空间与磁盘间直接传送而不通过页高速缓存。

5,异步模式

异步模式下,文件的访问可以有两种方法,即通过一组POSIX API或Linux特有的系统调用来实现。所谓的异步模式就是数据的传输请求不阻塞调用进程,而是在后台执行,同时应用程序继续它的正常运行。

说道内存映射,就要谈谈操作系统的内存管理了。

以80X86微处理器为例,我们必须区分以下三种不同的地址:

逻辑地址,包含在机器语言指令中用来指定一个操作数或一条指令的地址。每一个逻辑地址都由一个段(segment)和偏移量(offset)组成。

线性地址,也称虚拟地址,是一个32位无符号整数,可以用来表示高达4GB的地址。

物理地址,用于内存芯片级内存单元寻址。

内存控制单元(MMU)通过一种称为分段单元的硬件电路把一个逻辑地址转换成线性地址;接着,第二个称为分页单元的硬件电路把线性地址转换成一个物理地址。

注:
Linux采用4KB页框大小作为标准的内存分配单元。

内存碎片的产生主要是由于请求内存的大小与分配给它的大小不匹配而造成的。一种典型的解决办法是提供按照几何分布的内存区大小,即内存区大小取决于2的幂而不取决于所存放的数据大小。这样,不管请求内存的大小是多少,都可以保证内存碎片小于50%。

内核使用一种新的资源-线性区,实现了对进程动态内存的推迟分配。当用户态进程请求动态内存时,并没有获得请求的页框,而仅仅获得对一个新的线性地址区间的使用权,而这一线性地址区间就成为进程地址空间的一部分。这一区间叫“线性区”。

进程的地址空间由允许进程使用的全部线性地址组成。内核通过所谓线性区的资源来表示线性地址区间,线性区是有起始线性地址、长度和一些访问权限来描述的。

内存映射,内核给这个进程分配一个新的线性区来映射这个文件。一个新建立的内存映射就是一个不包含任何页的线性区,当进程引用线性区中的一个地址时,缺页异常发生。

与进程地址空间有关的全部信息都包含在一个叫做内存描述符的数据结构中,类型为mm_struct。

Linux通过类型为vm_area_struct的对象实现线性区

进程所拥有的所有线性区是通过一个简单的链表按照内存地址升序链接在一起的,其中mmap字段指向链表中的第一个线性区描述符。当进程中的线性区很多时,Linux2.6使用红黑树来存储内存描述符。

进程的地址空间、它的内存描述符以及线性区链表三者之间的关系,如下图所示:

每个Unix进程都拥有一个特殊的线性区-堆,堆用于满足进程的动态内存请求。

For most operating systems, mapping a file into memory is more expensive than reading or writing a few tens of kilobytes of data via the usual read and write methods. From the standpoint of performance it is generally only worth mapping relatively large files into memory.

JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0。

NIO2.0正式提供了异步文件通道和异步套接字通道的实现。对应于Unix网络编程中的事件驱动I/O – AIO。它不需要通过多路复用器-Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。

转自https://blog.csdn.net/donsonzhang/article/details/46626273