下面的11个命令在进行linux网络操作很有用,特意记录下.
curl & wget
curl和wget都可以下载文件
最近读了《the art of readable code》这本书,书的内容是叫你如何写出可读性高的代码。觉得里面的很多例子和观点有很大的参考价值,所以在这篇博客中记录下来。
书中有个很强的观点,代码是给人看的,不是给机器看的。所以写代码要将可读性放到第一位。写代码时要时刻考虑这段代码别人是不是容易阅读。
全书分为四部分,分别为:
这一部分主要是从变量命名、函数命名、注释等方面介绍如何提升代码可读性。
“代码中最困难的两件事情,是命名和缓存失效”。可见命名的困难性。在选择名称时,我们遵循如下原则:
1 | // GetPage函数不能表示page从哪里获取。推荐使用FetchPage和DownloadPage进行替换。 |
单词 | 同义词 |
---|---|
send | deliver,dispatch,announce,distribute,route |
find | search,extract,locate,recover |
start | launch,create,begin,open |
make | add, push, enqueue |
函数参数 | 推荐重构后的内容 |
---|---|
Start(int delay) | delay -> delay_secs |
CreateCache(int size) | size -> size_mb |
ThrottleDownload(float limit) | limit -M max_kbps |
根据变量的作用域选择合适的名称
变量的作用域如果比较长,则变量名称尽量多携带信息;变量作用域如果比较短,则变量名称推荐用简单的、简短的。
英文中一些推荐变量命名做法
1 | 下面的now完全没有必要,直接用后面的内容替换now就行 |
原文链接:https://github.com/jamiehannaford/what-happens-when-k8s
想像一下当你运行下面的命令时
1 kubectl create deployment nginx --image=nginx --replica=3
在一切顺利的情况下,在k8s集群中能够看到生成了3个pod。那么在底层到底发生了什么?本篇文章试图解决一个请求从客户端到kubelet整体的流程,并且也链接了源码。
针对开头提到的命令,在命令行输入回车后,会发生什么呢?
kubectl首先会进行客户端侧的验证。针对非法的命令(比如不支持的资源或镜像不合法)能够快速失败提示,避免发送到kube-apiserver,从而降低k8s负载。
经过验证后,kubectl组装将要发送到api-server的http请求。在k8s中,获取或者改变k8s中的状态都要通过apiserver,apiserver负责与etcd交互。kubectl使用generators来构建http请求,generators是负责序列化的一种抽象(注:可以简单的把generators理解为帮帮助用户构建完成的资源的工具,比如kubectl create中我们只指定了部分的内容,剩下的内容是generators帮助我们生成的)。
kubectl run 命令可以通过使用—generator参数指定运行多种资源,而不仅仅是deployments。如果不使用—generator参数,kubectl可以主动推断要运行的资源。
例如,参数--restart-policy=Always
会触发deployments,而--restrart-policy=Never
会触发pods。kubectl也会根据其他的参数确定需要执行的其他动作。比如记录命令(回滚或审计),或者--dry-run
在确定要创建的资源是Deployment后,kubectl将会使用DeploymnetAppsV1
generator来根据命令行参数构造runtime object。runtime object是k8s resource的通用表示。
k8s使用版本API,并且每个版本有自己所属的API groups。API groups意味着一些相似资源的集合。这比使用单调递增的版本号更加灵活。比如deployment的api group是apps,最近的版本是v1。这也是在deployment中写apiVersion: apps/v1
的原因。
kubectl生成了runtime object后,kubectl开始找到正确的API group和version,并且会组装versioned client。versioned client能够意识到资源的各种REST 语义。这个阶段被称为版本协商,它涉及kubectl扫描远程API上的/apis路径,从而检索所有可能的API组。由于kube-apiserver在此路径上公开了其模式文档(以OpenAPI格式),因此客户端可以很容易执行自己的发现。
为了提高性能,kubectl会将OpenAPI schema缓存到 ~/.kube/cache/discovery 目录中。如果想要看到API 发现的过程,那么可以将这个缓存目录删除,然后运行kubectl命令时带上-v参数。你将会看到查询API versions的所有http请求。
kubectl最后一步是发送http请求。一旦接收到成功的响应后,那么kubectl就会按照期望的格式打印结果。
为了成功发送请求,kubectl需要能够处理认证信息。用户的凭证一般存储在kubeconfig
文件中,但是有时候也在其他地方。kubectl做了下面的事情来决定用户凭证:
--kubeconfig
,那么就使用这个参数指定的文件$KUBECONFIG
,那么使用这个环境变量指定的文件~/.kube
,然后使用此目录中的第一个文件--username
, 那么这些参数将会覆盖凭证文件的配置。一旦有了这些信息,那么kubectl将会用这些信息去修饰http请求:k8s客户端和k8s系统组件与k8s交互主要是通过kube-apiserver来进行,比如获取和存储系统状态。kube-apiserver第一步要做的就是要验证请求的身份信息,这一步叫做认证(authentication).
apiserver如何进行认证呢?首先,apiserver在启动时,它会首先检查启动参数,然后组装一系列的认证器(authenticators)。举个例子,比如--client-ca-file
传递进来,那么会添加x509认证器;如果提供--token-auth-file
,那么会添加token认证器。apiserver收到的每个请求都会经过这些认证器,并且会依次进行认证,直到有一个认证器成功,那么请求就通过了。
--token-auth-file
指定)Authorization
会移除,并且用户信息会添加到上下文中。这样后续的apiserver处理逻辑(比如authorization和admission controllers)能够直接获取到用户信息。经过认证后,apiserver会进行鉴权检查,确定执行的请求动作是否允许。
apiserver处理鉴权跟认证类似。基于启动的参数,apiserver会组装一系列的鉴权器(authorizers), 这些鉴权器会依次检查请求。如果所有的鉴权都没有通过,那么请求会被拒绝并返回给客户端。如果其中某个鉴权器通过,那么请求会继续。
下面展示了一些鉴权器:
plugin/pkg/admission
目录下,每个controller都满足一个比较小的接口定义。然后编译到k8s中。apiserver验证了请求后,下一步apiserver反序列化http请求,从http请求中构建runtime objects(类似于kubectl generators的反过程),然后将objects存储到底层存储中。将这个过程细化如下。
首先,apiserver在接受到请求后是怎么知道如何处理呢?这个过程比较复杂。我们首先看下apiserver在启动时都会做什么:
<namespace>/<name>
,但是这个格式是可以配置的initializerConfiguration
允许你声明哪些资源运行哪些initializers。假设我们想要一个在每个pod创建时都运行的initializer,我们的配置如下:1 | apiVersion: admissionregistration.k8s.io/v1alpha1 |
metadata.initializers.pending
字段添加podimage.example.com
。initializer controller已经提前部署好,initializer会扫描每个新建的pod。当发现pod的metadata.initializers.pending
字段不为空时,initializer controller就会执行。执行完后,会将对应的podimage.example.com
删除。当所有的初始化器执行完毕并且pending
字段为空,则此对象被认为已经初始化完成。includeUninitialized
查询参数来返回所有的对象,包括那些还未初始化的对象。到现在这个阶段,deployment已经存储到etcd中并且初始化逻辑已经完成了。接下来就要设置资源拓扑。deployment就是replicasets的集合,而replicaSet就是pod的集合。k8s如何从http 请求中去创建这些对象呢?这就是k8s内置controller需要做的内容。
k8s在整个系统中大量使用了”控制器”思想。控制器就是一个异步过程,调谐k8s当前的状态到期望状态。每个控制器有自己的职责,并且由kube-controller-manager
组件负责并发运行。下面介绍deployment controller
在 deployment存储到etcd并且初始化完成后,那么deployment就能够被kube-apiserver看到。deployment controller能够检测到可用的deployment资源以及对于资源的变更。在我们的例子中,deployment controller通过informer为创建事件注册了一个特定的回调。
当deployment可用时,handler开始执行,将对象添加到内部的工作队列中。当开始处理这个对象时,控制器会检查deployment,发现deployment没有关联的ReplicaSet或者Pod记录。这是通过使用标签选择器来查询kube-apiserver来完成的。有趣的是,这个同步过程是状态未知的:调谐新的记录跟调谐旧的记录是一样的工作方式。
在意识到相关的ReplcaSet和Pod不存在后,deployment controller会开启scaling process来解析状态。创建一个ReplicaSet资源,给它分配标签选择器,然后给定数字1的版本号。ReplicaSet中的PodSpec是从deployment清单中拷贝而来,类似于其他相关的元数据。有些时候,deployment记录在这个过程后也需要对应的更新(比如,设定了截止日期)
更新deployment中的status字段,然后控制器重新进行调谐过程,然后等待deployment达到期望的状态。由于deployment controller仅仅关心创建ReplicaSets,接下来的工作需要由下一个控制器来完成,既ReplicaSet controller。
在前面的步骤中,Deployments controller创建了ReplicaSet,但是还没有创建Pods。这就是ReplicaSet 控制器发挥作用的时刻。ReplicaSet controller的主要工作就是监视ReplicaSet的生命周期和对应的Pods资源。类似于大多数其他的控制器,它通过在特定事件上触发handler来实现。
我们感兴趣的事件是创建。当ReplicaSet被创建后。ReplicaSet controller检查新的replicaSet的状态,然后发现当前的状态跟期望的状态不一致。它试图通过增加属于ReplicaSet的Pod数量来达到目标状态。它以谨慎的方式来创建它们,确保ReplicaSet的数量始终匹配。
创建Pods的过程是批量处理的,以SlowStartInitiaBatchSize
开始,然后每次都以两倍的数量来启动其他的Pods。这样做的目的是确保当pod创建失败时,避免大量的http 请求到apiserver。如果要失败,可以采用对组件影响最小的方式来进行。
k8s通过Owner References(child资源中的一个字段,用来指向父资源的id)来确保对象的引用关系。这样做,不仅能够确保一旦父资源被删除,子资源也会被删除(级联删除),同时还能够保证父资源不会竞争子资源(想象一下,两个潜在的父资源认为他们拥有同样的子资源)
Owner Reference另一个好处是,它是有状态的。如果控制器重启了,那么重启过程中,不会影响到这些资源,因为资源拓扑是独立于controler的。这种对隔离的关注也渗透到控制器本身的设计中:它们不应该操作它们没有明确拥有的资源。相反,控制器应该只操作它们明确拥有的资源。
然而,有时候会存在”孤儿”资源,比如下面这些情况:
Pending
,因为它们还没有被调度。最后的一个控制器是scheduler,它负责调度pod。NodeName
为空的pod,然后尝试将它们调度到合适的节点上。PodScheduled
设置为True
.注:定制调度器。通过
--policy-config-file
可以定制predicate和priority函数。我们可以在k8s集群中运行特定的调度器,比如采用deployment运行。如果在PodSpec中指定了schedulerName
,那么k8s会将调度过程交给这个调度器执行。
让我们总结下上面的过程:
NodeName
于当前节点名称一致的pod。然后它将pod与本地内部缓存中pod的信息进行比较,如果发现有差异就进行同步。下面来具体看下同步的过程是怎么样的:Pending
,Running
,Succeeded
,Failed
和Unknown
。产生这些状态是比较复杂的,让我们深挖一下:PodSyncHandlers
会顺序执行。每个handler都会检查pod是否应该在当前节点上。如果任意的handler判断pod不应该在当前节点上,那么pod的phase将会变成PodFailed
,并且pod会从此节点上被剔除。比如在Job资源中设置了activeDeadlineSeconds
,超过了设置的时间,那么pod就会被剔除。Pending
PodReady
condition设置为Falsecgroups-per-qos
参数,那么kubelet将会为pod创建cgroups并且会应用这些资源参数。这是为了实现对pod更好地服务质量(QoS)处理。/var/run/kubelet/pods/<podID>
),卷目录(<podDir>/volumes
)以及插件目录(<podDir>/plugins
)Spec.Volumes
中定义的卷准备好。根据挂载卷的类型,有些pod需要等待更长的时间(比如云或NFS卷)。Spec.ImagePullSecrets
中定义的secrets资源,会从apiserver中获取。从而后续会注入到容器中docker
或rkt
)。RunPodSandbox
。沙箱是一个CRI术语,用来描述一组容器,在k8s中被称作pod。1 | { |
CNI_ARGS
环境变量提供名称和命名空间。bridge
CNI插件的工作:host-local
,它从一个预先定义好的地址范围中分配IP地址。它将已经分配的ip地址存储在本地的文件系统中,从而确保在本机上不会重复使用ip地址。resolv.conf
文件被正确设置。ContainerConfig
结构体(包括启动命令、镜像、标签、挂载、环境变量等),数据来自于PodSpec。然后通过protobufs协议发送给CRI插件。对于Docker来说,docker会反序列化请求,然后填充到自己的config结构体,然后发送给docker的后台进程。在这个过程中,它将一些元数据标签(如容器类型、日志路径、沙盒id)应用到容器。Exec
(在容器中执行特定的命令)或Http
(执行一个http请求)。如果PostStart hook需要很长时间运行、或者失败,那么容器将不会到达running状态到这里,已经产生了三个容器,他们可能运行在一个或者多个工作节点上。所有的网络、卷、secret都已经被kubelet填充,并且通过CRI插件填充到了容器中。
redis中字符串是自己定义的结构,名为SDS。不是用的c语言的字符串。
SDS的定义如下:1
2
3
4
5
6
7
8struct sdshdr {
// 记录buf数组中已使用字节的数量
int len;
// 记录buf数组中未使用字节的数量
int free;
// 字节数组,用于存储字符串;其中最后一位保存'\0'字符
char buf[];
}
为什么要自己实现,而不是复用c中对于字符串的定义?
SDS的扩容机制:
redis中链表结构
链表中节点定义1
2
3
4
5
6
7
8typedef struct listNode {
// 前置节点
struct listNode *prev;
// 后置节点
struct listNode *next;
// 节点值
void *value;
} listNode;
链表定义1
2
3
4
5
6
7
8
9
10
11
12
13
14typedef struct list {
// 表头节点
listNode *head;
// 表尾节点
listNode *tail;
// 链表包含的节点数量
unsigned long len;
// 节点值复制函数
void *(*dup) (void *ptr);
// 节点值释放函数
void (*free) (void *ptr);
// 节点值对比函数
int (*match) (void *ptr, void *key);
} list;
注:链表节点使用 void*指针保存节点值,并且通过list中的dup、free、match三个属性为节点值设置类型特定函数,所以链表可以保存各种不同类型的值。
翻译自:https://www.confluent.io/blog/top-5-things-every-apache-kafka-developer-should-know
介绍下面5个内容
- 理解消息传递和持久化保证
- 学习producer api中的新的粘性分区(learn about the new sticky partitioner in the producer API)
- 利用cooperative rebalancing(协同重平衡)来避免消费者组执行rebalance时的stop the world
- 掌握常用命令行工具
- kafka console producer
- kafka consule consumer
- dump log
- delete records
- 使用record headers的能力
- 为kafka记录增加headers
- 检索headers
针对数据持久化,KafkaProducer提供了不同的配置。acks 配置指定了当生产者接受到多少消息确认后,才认为记录已经成功发送到broker上。kafka提供了以下三种选择:
一般来说,设置acks=all, 我们的要求通常都是所有副本都应该确认,或者至少大量的in sync副本应该确认。如果不是这样,那么应该抛出异常知道所有副本都在in sync中。
为了满足这个要求,kafka提供了这样一个配置:min.insync.replicas. 这个配置强制指定多少个副本写成功才被认为真正写成功。需要注意的是,min.insync.replicas配置在是broker或者topic级别,而不是在producer上。min.insync.replicas默认值是1。所以为了避免上面说的情况,在三个副本的情况下,需要将min.insync.replicas设置为2。
上图中展示了in sync中的副本不满足min.insync.replicas要求的情况,此时producer发送的消息,leader broker不会将记录添加到log中,而是会抛出NotEnoughReplicasException 或者 NotEnoughReplicasAfterAppendException。副本与leader不一致被认为是一种可以重试的错误,所以producer会重试直到成功或者达到超时时间(默认值两分钟)delivery.timeout.ms。
如果需要非常高的数据持久化保证,那么应该同时设置min.insync.replicas和acks=all.
kafka需要partition来提升吞吐量并且将消息均衡到不同的broker上。kafka的消息记录是key/value格式,其中key可以为null。kafka producer在发送消息时,不会立即发送,而是将消息放置到对应的partition batch中(类似缓存),待缓存满了,在一次发送。batch是一种增加网络利用的有效方式。在将消息发送到partition中,通常有三种方式来决定发送到哪个partition上。
下图展示了方式3:
round robin方法对于将消息均衡到不同的partition上工作的很好。但是存在一个缺点,由于producer是依次将消息发送到不同的partition batch中,那么有可能会出现每个partition中的batch都填充不满。比如下面展示的,topic有三个partition。假设应用产生了9条消息,并且消息没有key,所有的消息几乎是同时发送,如下图:
9条记录分散到是三个batch中,每个batch有三条。但是如果我们将9条消息放到一个batch中会更好。更少的batch使用更少的网络带宽并且对于broker的负载更小。
kafka 2.4.0新增了sticky partitioner approach. 这种方法能够将消息发送到一个partition的batch中直到此batch满了。然后,发送这个batch,sticky partitioner使用下一个partition的batch。如下图展示了使用sticky partitioner的例子:
通过使用sticky partitioner方法,我们减少了请求次数,同时也减少了请求队列上的负载,也减少了系统延迟。需要注意的时,sticky partitioner仍然是将消息均衡放置到不同的partition batch中。可以将这种认为是per-batch round robin 或者 eventually even approach。
如果想要更多了解sticky 模式,可以参考 Apache Kafka Producer Improvements with the Sticky Partitioner
kafka是一个分布式系统,而分布式系统中一个重要的事情就是如何处理失败。kafka处理失败的方式之一是使用consumer group,consumer group管理多个consumer。如果其中一个consumer停止,kafka会进行rebalance从而确保另一个consumer能够接管这个工作。
从2.4版本开始,kafka引入了一个新的rebalance协议,cooperative rebalancing。在深入了解cooperative rebalancing之前,先来了解一下consumer group的基础。
假设一个分布式应用(比如一个微服务的多个副本)有个多个consumer,订阅同一个topic。这些consumer组成了一个consumer group,具有同样的.group.id。在consumer group中的每个consumer负责从一个或多个partition中消费消息。这些partition的分配是由consumer group中的leader进行的。如下图所示:
从图中可以看到,总共有6个partition,在理想的情况下,每个consumer负责消费两个partition。但是如果其中的某个应用失败了或者不能连接网络。那么对应的partition中的消息是不是就不能被消费直到应用恢复?幸运的是,由于consumer rebalancing协议的存在,不会发生这种情况。
下图展示了consumer group protocal过程:
如上图,consumer2由于某些原因失败了。group coordinator将它从组中移除然后触发rebalance。rebalance尝试将工作负载在组内所有工作的consumer上进行均衡分布。在这个例子中,consumer2离开了组,rebalance会将consumer2拥有的partition分配给组内其他的consumer。所以对于一个consumer group,如果其中consumer失败了, 那么对于这些partition的处理不会产生影响。
但是,默认的rebalance协议有个缺点。在rebalance过程中,每个consumer都会放弃之前获得的partition(这会造成consumer停止消费),知道topic下所有的partition都被重新分配。这种情况被称为stop the world rebalance。为了解决这个问题,依靠ConsumerPartitionAssignor实例,consumer简单的重新获取之前分配的partition,所以在这些partition上仍然能够继续消费。
上述描述的实现被称为eager rebalancing, 因为它优先考虑的是针对一个consumer group中,不会有两个consumer同时对于一个partition拥有主权。
虽然对于同一个topic下的某个partition不能具有相同的consumer非常重要,但是有一种更好的方法,既能够提供安全性同时还不会暂停处理,既incremental cooperative rebalancing。这个方法在kafka2.3版本的kafka connect中被首次引入,现在已经在consumer group 协议中实现了。利用cooperative 方法,消费者不会在rebalance开始时主动放弃partition的所有权。在cooperative方法中,consumer group中的所有成员会将当前的分配进行编码然后将信息发送到group leader中。group leader决定那个partition需要修改对应的consumer。而不是一开始就完全从新分配。之后第二次rebalance发起,但是这一次,仅仅涉及到那些需要改变所有权的分区。这有可能是撤销不在用的partition或者新增的partition。对于那些没有改变所有权的分区,这些分区中的数据会继续进行处理。
这种处理办法解决了stop-the-world,而仅仅是暂停了哪些需要修改所有权分区的消费。这带来了更少的rebalance代驾以及降低了完成rebalance的时间。即使rebalance时间很长也没有关系,因为现在数据仍然被处理。使用CooperativeStickyAssignor能够开启这个功能。
如果要开启这个功能,则需要将partition.assignment.strategy设置为使用CooperativeStickyAssignor。这种设置完全是在客户端测,所以仅仅更新客户端版本即可。而在Kafka Stream中,这个功能是默认开启的。
下面介绍了4种在平时工作中使用最多的工具。
1 | # 开启发送者程序, 发送的消息只有value,没有key |
1 | # 消费指定topic中的消息 |
1 | # 指定打印topic为example-0中的日志,参数--print-data-log表示输出日志 |
kafka提供了配置来控制数据保留,包括时间和数据大小
如果想要删除数据,可以使用下述命令:1
2kafka-delete-records --bootstrap-server <broker-host:port> \
--offset-json-file offsets.json
offsets.json 文件内容如下:1
2
3
4
5
6{
"partitions": [
{"topic": "example", "partition": 0, "offset": -1}
],
"version":1
}
参数介绍如下:
Record headers可以给kafka消息添加一些元数据,并且不是给消息的key value添加额外的信息。比如如果你想要在消息中嵌入一些信息,如表示消息来源系统,也是是想要增加一些审计功能。
为什么不能将这些额外的数据添加到key中。因为给key中添加数据会带来两个潜在的问题
我们了解了kafka的五个tips,我们理解了下面的知识点
本篇内容主要翻译自Efficient data transfer through zero copy,包括有些自己的思考
很多网页应用有大量的静态内容,针对这些静态内容主要的处理逻辑就是从磁盘读取数据,然后将数据写入到响应的socket中。这项工作应该需要较少的CPU资源。但是有时候并不是。内核从磁盘读取数据,然后将数据拷贝到应用中。之后应用将数据写入到内核,然后推送到socket中。实际上,应用程序在这里扮演了一个无效率的中间层,既将数据从磁盘写入到socket。
每一次当数据在用户空间和内核空间转换时,数据都会被拷贝,而这会消耗cpu cycles以及内存带宽。幸运的是,我们可以采用zero copy技术来避免内核和应用程序之间的数据拷贝。应用程序使用zero copy技术来请求内核直接将数据从磁盘文件拷贝到socket中,而不需要经过应用程序。zero copy技术能够极大的提升应用程序性能并且减少内核空间和用户空间之间的切换。
Java中使用java.nio.channels.FileChannel
中的transferTo()
方法在linux和Unix系统重实现zero copy。使用transferTo()
方法能够直接将字节数据从一个channel写入到另一个channel中,而数据不需要经过应用程序。本篇文章首先展示使用传统的拷贝方法消耗的资源,然后展示使用zero copy获得的性能提升。
想想一个简单的场景,从一个文件读取数据,然后将数据通过网络写入到另一个程序中。核心操作如下所示。1
2File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);
虽然看起来很简单,但是这个操作需要再内核空间和用户空间4次上下文切换,以及4次数据拷贝。下图展示了具体过程
下图展示了上下文切换过程
主要的步骤如下:
read()
方法从user mode 转换到 kernel mode。在read内部是发一起了一次系统调用sys_read()
从文件读取数据。第一次数据拷贝是由DMA引擎来执行,DMA从磁盘读取文件,然后将数据保存到内核缓冲区中。read()
方法返回。这引起了第二次的上下文切换。现在数据是在用户空间中的缓冲区中。send()
方法调用引起用户空间到内核空间的切换。这次会将数据从用户空间拷贝到内核缓冲区。这一次数据是放置到另外一个内核缓冲区中,与目的socket相关联。send()
方法返回,又引起了一次上下文切换。DMA将数据从内核缓冲区拷贝到网卡缓冲区中,这是第四次数据拷贝。我们为什么不直接将数据拷贝到用户空间,而要经过内核空间呢?首先因为用户进程没有办法直接读取硬盘,这是基于保护的目的,都要经过操作系统的处理,才能读取到硬盘内容;其次这是因为操作系统引入内核缓冲区是为了提升性能。操作系统读取数据都会预读取一些数据,这样在应用程序读取额外的数据时,可以不用发起系统调用从硬盘中读取,而是直接从内核缓冲区获取即可。而写入过程,可以完全实现为异步过程。既应用程序只需要将数据写入到内核缓冲区中,而不是写入到磁盘中。
但是,设想当前应用程序需要处理的数据要远远大于内核空间缓冲区的大小。而此时,数据需要在磁盘,内核缓冲区,用户空间中来回拷贝。这会严重影响性能。
Zero copy技术是解决这个问题的方法。
如果你仔细检查上面的过程,你会发现第二次和第三次数据拷贝可以省略。应用程序针对这些数据什么也不做。因此数据可以被直接从内核缓冲区拷贝到socket buffer中。transferTo()
方法可以完成这个操作。下面展示了此方法1
public void transferTo(long position, long count, WritableByteChannel target);
transferTo()
方法将数据从文件channel拷贝到target channel中。这个方法依赖底层操作系统对于zero copy的支持。在UNIX或linux中,使用的是sendfile()
系统调用。如下所示:1
2#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
下图展示了transferTo()
方法执行过程
下图展示了具体的上下文切换
transferTo()
方法执行过程如下:
我们将上下文切换从4次降低到2次,数据拷贝从4次降低到3次(其中仅有一个数据拷贝需要CPU参与,图中序号2)。但是这还没有达到zero copy的目的。如果底层网卡支持gather操作,那么我们可以减少内核空间中的数据重复。在linux kernel2.4及以后版本中,socket buffer已经支持了这个操作。这个方法不仅仅减少了上下文切换并且也消除了CPU参与的数据拷贝。具体如下:
transferTo()
方法将文件内容拷贝到内核缓冲区,由DMA引擎执行transferTo()
使用java实现文件传输,采用传统的IO和nio来分别实现进行对比。完整代码参考.其中客户端是主要的视线,服务端仅仅读取数据。
1 | // 1. create socket and connect to server |
1 | public void testSendfile() throws IOException { |
性能比较如下:
file size | traditional(ms) | nio(ms) |
---|---|---|
12MB | 50 | 18 |
221MB | 690 | 314 |
2.5G | 15496 | 2610 |
评测环境:
本篇文章主要介绍读写锁的一些原理及实现。翻译原文地址
假设一个java应用程序需要读以及写一些资源,但是写的频率要远远低于读。多个读线程读取资源不会有什么问题。但是如果一个线程想要写资源,那么同时就不能有其他线程读或写这个资源。为了能够允许多个读线程和一个写线程,我们需要读写锁。
虽然Java中提供了读写锁的实现,但是我们还是要知道读写锁背后的原理,这样才能在实际使用中处理具体的问题。
总结一下对于获取资源的读权限和写权限可以如下:
读权限:当没有其他线程写当前资源或者没有其他线程请求当前资源的写权限,那么当前线程就能够获取资源的读权限
写权限:如果没有其他线程读或者写当前资源,当前线程就能获取资源的写权限
只要没有其他线程正在写资源或者没有其他线程请求写资源,那么当前线程就能够读取资源。如果读线程发生的很多,但是又没有提升写线程的优先级,那么就可能发生”饥饿”现象。
本篇主要介绍线程之间如何进行信号的通知。同时介绍wait,notify底层的一些实现。
最简单的进行线程之间通知的方式就是采用共享变量的方式。比如下面的代码1
2
3
4
5
6
7
8
9public class MySignal{
protected boolean hasDataToProcess = false;
public synchronized boolean hasDataToProcess(){
return this.hasDataToProcess;
}
public synchronized void setHasDataToProcess(boolean hasData){
this.hasDataToProcess = hasData;
}
}
线程A和线程B共享同一个MySingal实例,当线程A处理好数据后,可以设置hasDataToProcess属性为true,然后线程B获取到此属性。从而完成线程之间的信号通知。当然,如果线程A和线程B不是在同一个MySingal实例上进行的,则不能进行信号的传递。
在采用MySingal的例子中,一般会采用下面的代码来判断一个线程是否可以进行处理了。1
2
3
4protected MySignal sharedSignal = new MySingal()
while(!sharedSignal.hasDataToProcess()){
//do nothing... busy waiting
}
从代码中可以看到,检测hasDataToProcess属性是在一个while循环中,如果hasDataToProcess为false,这就会造成线程一直在执行while语句,造成忙等待。这会造成CPU资源的浪费。
一般在Java中,我们一般会采用wait,notify或notifyAll进行线程之间信号的传递。线程调用某一个对象上的wait方法前,必须首先获取该对象上的锁,才能执行此对象上的wait方法。在调用notify或者notifyAll之前,也是要获取对应对象上的锁。调用wait方法时,会释放此对象上的锁,线程进入阻塞状态,等待信号。而调用notify后,会随机唤醒一个同对象上的线程,但是必须是退出了notify对应的synchronized块后,被唤醒的线程才能继续执行,因为被唤醒的线程还要获取对象上的锁。如下面的代码所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class MonitorObject{
}
public class MyWaitNotify{
MonitorObject myMonitorObject = new MonitorObject();
public void doWait(){
synchronized(myMonitorObject){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
}
public void doNotify(){
synchronized(myMonitorObject){
myMonitorObject.notify();
}
}
}
从代码中,可以看到在myMonitorObject上调用wait方法之前,会先获取myMonitorObject上的锁。在调用notify方法之前,也是要先获取myMonitorObject上的锁。在下面的内容中我会简单介绍wait和notify的底层原理。
在JDK1.8中,Object对象中有三个wait(),wait(long timeout),wait(long timeout, int nanos)这三个方法,它们的主要区别是后两个方法增加了等待的时间。
下面是JDK1.8中关于wait(long timeout)方法的描述:
/**
* Causes the current thread to wait until either another thread invokes the * {@link java.lang.Object#notify()} method or the * {@link java.lang.Object#notifyAll()} method for this object, or a * specified amount of time has elapsed. * <p> * The current thread must own this object's monitor. * <p> * This method causes the current thread (call it <var>T</var>) to * place itself in the wait set for this object and then to relinquish * any and all synchronization claims on this object. Thread <var>T</var> * becomes disabled for thread scheduling purposes and lies dormant * until one of four things happens: * <ul> * <li>Some other thread invokes the {@code notify} method for this * object and thread <var>T</var> happens to be arbitrarily chosen as * the thread to be awakened. * <li>Some other thread invokes the {@code notifyAll} method for this * object. * <li>Some other thread {@linkplain Thread#interrupt() interrupts} * thread <var>T</var>. * <li>The specified amount of real time has elapsed, more or less. If * {@code timeout} is zero, however, then real time is not taken into * consideration and the thread simply waits until notified. * </ul> * The thread <var>T</var> is then removed from the wait set for this * object and re-enabled for thread scheduling. It then competes in the * usual manner with other threads for the right to synchronize on the * object; once it has gained control of the object, all its * synchronization claims on the object are restored to the status quo * ante - that is, to the situation as of the time that the {@code wait} * method was invoked. Thread <var>T</var> then returns from the * invocation of the {@code wait} method. Thus, on return from the * {@code wait} method, the synchronization state of the object and of * thread {@code T} is exactly as it was when the {@code wait} method * was invoked. * <p> * A thread can also wake up without being notified, interrupted, or * timing out, a so-called <i>spurious wakeup</i>. While this will rarely * occur in practice, applications must guard against it by testing for * the condition that should have caused the thread to be awakened, and * continuing to wait if the condition is not satisfied. In other words, * waits should always occur in loops, like this one: * <pre> * synchronized (obj) { * while (<condition does not hold>) * obj.wait(timeout); * ... // Perform action appropriate to condition * } * </pre> * (For more information on this topic, see Section 3.2.3 in Doug Lea's * "Concurrent Programming in Java (Second Edition)" (Addison-Wesley, * 2000), or Item 50 in Joshua Bloch's "Effective Java Programming * Language Guide" (Addison-Wesley, 2001). * * <p>If the current thread is {@linkplain java.lang.Thread#interrupt() * interrupted} by any thread before or while it is waiting, then an * {@code InterruptedException} is thrown. This exception is not * thrown until the lock status of this object has been restored as * described above. * * <p> * Note that the {@code wait} method, as it places the current thread * into the wait set for this object, unlocks only this object; any * other objects on which the current thread may be synchronized remain * locked while the thread waits. * <p> * This method should only be called by a thread that is the owner * of this object's monitor. See the {@code notify} method for a * description of the ways in which a thread can become the owner of * a monitor. * * @param timeout the maximum time to wait in milliseconds. * @throws IllegalArgumentException if the value of timeout is * negative. * @throws IllegalMonitorStateException if the current thread is not * the owner of the object's monitor. * @throws InterruptedException if any thread interrupted the * current thread before or while the current thread * was waiting for a notification. The <i>interrupted * status</i> of the current thread is cleared when * this exception is thrown. * @see java.lang.Object#notify() * @see java.lang.Object#notifyAll() */ public final native void wait(long timeout) throws InterruptedException;
从代码的注释中,我们可以获得以下一些内容:
在JVM实现获取对象上的锁,是通过monitor进行实现的。图示如下:
当一个线程需要获取 Object 的锁时,会被放入 EntrySet 中进行等待,如果该线程获取到了锁,成为当前锁的 owner。如果根据程序逻辑,一个已经获得了锁的线程缺少某些外部条件,而无法继续进行下去(例如生产者发现队列已满或者消费者发现队列为空),那么该线程可以通过调用 wait 方法将锁释放,进入 wait set 中阻塞进行等待,其它线程在这个时候有机会获得锁,去干其它的事情,从而使得之前不成立的外部条件成立,这样先前被阻塞的线程就可以重新进入 EntrySet 去竞争锁。
乍一看,notify和notifyAll的区别很简单,就是notify只能随机选择一个处于等待状态的线程进行唤醒;而notifyAll可以唤醒所有处于等待状态下的线程,但是也是只有一个线程能够继续执行。
如果结合上面的图片,我们就能更好地进行理解。当线程在对象上调用notify方法时,随机选择一个处于等待状态的线程,并且把该线程放置在该对象的EntrySet列表中。而如果调用的是notifyAll方法时,所有的处于等待状态下的线程都会进入到EntrySet中,从而多个线程进行对象上的锁。notify有点类似于网络中的单播,而notifyAll类似于多播。
设想一下这种情况,有一个线程首先调用了notify方法,然后其他的线程调用了wait方法。那么处于wait下的线程将不会接受到之前线程发送的notify信号。如下面代码所示,代码地址:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class WaitNotifyMissSingal {
public static void main(String[] args) {
final Object lock = new Object();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread B is waiting to get lock");
synchronized (lock) {
System.out.println("thread B get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.notify();
System.out.println("thread B do notify method");
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread A is waiting to get lock");
synchronized (lock) {
try {
System.out.println("thread A get lock");
TimeUnit.SECONDS.sleep(1);
System.out.println("thread A do wait method");
lock.wait();
System.out.println("wait end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
运行上面的代码,程序会一直运行下去。那么如何解决这个问题呢,其实在Object的wait的方法注释中也有对应的说明。我们可以把通知信号保存在信号类的成员变量中。代码地址1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class MyWaitNotify2{
MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;
public void doWait(){
synchronized(myMonitorObject){
if(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}
public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}
在上面的代码中,将信号保存在了wasSingalled变量中,只要调用了notify方法,就是将wasSignalled设置为true,表示有线程执行了notify。在doWait方法中,如果wasSignalled为false,则当前的线程会执行wait方法,进入等待状态;而当wasSignalled为true时,不会执行wait方法,只会将wasSignalled设置为false。
因为一些底层操作系统的原因,具体的可以查看unix操作系统相关内容。简单理解,就是当线程没有接受到唤醒信号时,而线程被错误的唤醒。为了防止伪唤醒,一定要在while循环中检查信号变量的值。这样的循环也叫自旋锁。代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class MyWaitNotify3{
MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;
public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}
public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}
请看下面的例子1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class MyWaitNotify{
String myMonitorObject = "";
boolean wasSignalled = false;
public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}
public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}
从代码中我们可以看到,在上锁时是对myMonitorObject对象上锁,也是在myMonitorObject上调用wait方法,而myMonitorObject是一个空的字符串。我们知道,JVM会将相同的字符串看成同一个对象。也就说说,如果有两个MyWaitNotify实例,则这两个实例中的myMonitorObject是指向同一个对象的。那么一个在实例MyWaitNotify上调用wait的线程会被在另一个实例MyWaitNotify上调用doNotify方法的线程唤醒。下面的图表示了实例的成员指向了相同的对象。
在上图中,4个线程是在相同的String常量上调用wait和notify方法,但是信号wasSignalled仍然是单独存放在对应的实例对象上。即一个在MyWaitNotify1实例上调用doNotify方法的线程可能会唤醒在MyWaitNotify2实例上等待的线程,但是唤醒信号仍然是单独保存在MyWaitNotify1实例中。
如果仔细看上面的程序,发现当在第二个MyWaitNotify2实例上调用doNotify方法时,会唤醒线程A或者B,但是由于在while循环中的wasSignalled变量,对于MyWaitNotify1实例仍然是false。所以被唤醒的线程A或者B从wait启动,但是会再次进入while循环调用wait,再次进入阻塞状态。这跟伪唤醒很像。
由于在doNotify方法中调用的notify方法,此方法不像notifyAll方法,notify方法只会唤醒一个线程,如果是线程C调用的doNotify,本来想唤醒的是线程D。但是有可能会错误的唤醒线程A或B,并且线程A或B会修改对应实例上的wasSignalled变量。而发给D的信号就丢失了,就有点像信号丢失的情况。如果将doNotify方法中的notify替换成notifyAll就不会有这个问题。但是这是一个坏主意,当应用仅仅只需要唤醒一个线程时,没有任何理由要把所有的线程都唤醒。
注意,对于wait/notify的情况,永远不要使用全局的对象例如string常量。在wait和notify上使用的对象针对对应的实例必须是独一的。
在平常进行java并发程序的开发过程中,synchronized关键字的出现频率很高。但是synchronized底层是如何实现的,synchronized都有哪些具体的用法。本篇将会在下面进行详细讲解。
synchronized关键字主要是用来进行同步操作。synchronized关键字修饰的内容,每次都只能有一个线程进入,如果其他线程想要进入相同的代码块,那么必须等前一个线程释放代码块对应的锁,其他的线程才能进入此代码块。但是同一个线程能够多次进入一个相同的同步块,也就是synchronized具有可重入锁的特性。
总的来说,在java中,主要在三个地方使用synchronized关键字
1 | public class MyClassInstance { |
在上面的代码中展示了,synchronized应用在实例方法的方式。当synchronized用在实例方法上时,每当线程进入此方法时,会尝试获取对应的类实例上的锁(注意是类实例)。如果没有其他的线程持有此实例上的锁,那么线程会获取此实例锁,然后运行此方法。
下面的图显示了使用javap进行反编译后的内容(运行:javap -v MyClassInstance.class
):
图中仅仅展示了add方法反编译后的内容,可以看到在方法的flags标记中出现了ACC_SYNCHRONIZED。这个就是表明前面方法是用synchronized关键字修饰的。这个跟同步代码块的反编译结果是不同的。但是在线程执行同步操作时,都是要获取对应对象上的锁。
1 | public class MyClassStatic { |
上面代码展示了如何在静态方法上使用synchronized关键字。当线程尝试进入类的静态方法时,会尝试获取类上的锁(注意是类上的锁,跟类实例上的锁不同的东西)。如果没有其他线程持有此类上的锁,那么当前线程会获取此类上的锁,然后运行此方法。
下面展示了反编译的结果(运行命令:javap -v MyClassStatic.class
)
可以从反编译的结果中看到在方法的flags中也是包含了ACC_SYNCHRONIZED.
如果synchronized关键字用在代码块上,会在其之后括号中的对象获取锁。1
2
3
4
5
6
7
8
9
10
11
12public class MyClass {
public void log(String msg) {
synchronized(this) {
System.out.println(msg);
}
}
public static void func(String msg) {
synchronized(MyClass.class) {
System.out.print(msg);
}
}
}
在上面的代码中,在方法log中,当线程进入log方法,然后执行synchronized关键字修饰的代码块。线程会尝试获取当前类实例上的锁,因为括号中使用的是this关键字。而在func方法中,线程会尝试获取MyClass类的锁。注意这两个锁是不同的。所以一个线程可以执行log中的同步代码块,而同时另一个线程也可以执行func中的同步代码块。
反编译结果如下(运行命令:javap -v MyClass.class
):
可以看到在方法的签名中是没有ACC_SYNCHRONIZED的。但是在代码中出现了monitorenter和monitorexit。
monitorenter:
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.
引用中的内容来自JVM规范。这段话的大概意思为:
每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
monitorexit:
The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.
这段话的大概意思为:
执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
通过这两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。
一个monitorenter都会对应有一个monitorexit。但是我们从反编译的结果中,可以看到多出了一个monitorexit,即第18行。因为在synchronized中的代码遇到异常时,会释放锁。第一个 monitorexit 指令如果正确执行,会走到下面的 goto 指令,直接跳转到 21 行 return,而如果发生异常,下面的 astore_3 和 aload_2 指令会继续执行异常问题,下一步会继续执行 monitorexit 指令退出同步。
当然,有时候,我们也可以这样用synchronized关键字修饰代码块1
2
3
4
5
6
7
8
9
10
11
12
13
14public class MyClass2 {
private Object obj = new Object();
private static Object obj2 = new Object();
public void log(String msg) {
synchronized(obj) {
System.out.print(msg);
}
}
public static void func(String msg) {
synchronized(obj2) {
System.out.print(msg);
}
}
}
在上面的代码中,log方法中的同步块是对obj实例进行加锁,注意每个MyClass2实例中的obj都是不同的。而func中的同步块是对obj2静态成员进行加锁。
Java内存模型规定了Java虚拟机如何跟计算机的内存协同工作。因为Java虚拟机模拟了计算机,所以自然Java虚拟机包括内存模型。
正确理解Java内存模型对于编写正确的并发程序非常重要。Java内存模型规定了线程何时以及怎么读取其他线程写的值,还有就是在获取共享变量时如何进行同步操作。
最初的Java内存模型是有缺陷的,因此在Java5中进行了修改,并且这个版本的Java内存模型一直到Java8都在使用。
JVM中的内存模型将内存分为线程栈内存和堆内存。下面的图从逻辑上展示了Java内存模型:
每个在JVM中运行的线程都有它自己栈空间。线程的栈空间中包含了线程调用方法执行到那一刻的数据。随着线程执行它的代码,调用栈也随之改变。
线程的栈空间同样也包含每个执行的方法的局部变量。线程只能获取它自己的栈空间,其中包含的局部变量对于其他线程是不可见的。即使两个线程执行相同的代码,这两个线程也是在各自的线程栈空间中创建各自的局部变量。
所有的原型类型(boolean,byte,short,char,int,long,float,double)的局部变量都是存储在线程的栈空间中。一个线程可能传递一个原型变量的副本给另一个线程,但是另一个线程并不能共享这个原型变量。
不管哪个线程创建了对象,这些对象都是存储在堆空间中。这也包括原型类型的包装器类型(e.g. Byte, Integer)。不管对象是被分配给局部变量还是作为另一个对象的成员变量,这个对象都是存储在堆空间中。
下面的图中说明了调用栈和局部变量存储在线程的栈空间中,而对象存储在堆空间中:
如果局部变量是原型类型,那么这个变量在线程的栈空间中。
如果局部变量是一个指向对象的引用类型,那么这个引用是在线程的栈空间,但是对象本身是在堆空间中。
如果一个对象包含方法,同时这些方法包含局部变量。那么这些局部变量(原型类型或者引用)是保存在线程栈空间中,即使这些方法所在的对象是存储在堆空间中。
一个对象的成员变量是跟对象一起存储在堆内存中,不管这个成员变量是原型还是指向一个对象的引用。
类的静态变量也是跟类的定义一起存在堆内存中。
堆空间中对象能够被所有拥有指向此对象的引用的线程访问到。当一个线程能够访问一个对象时,那么这个线程也能够访问此对象的成员变量(这里要看这个对象的封装性)。如果两个线程同时调用了同一个对象的一个方法,那么这两个线程将能够访问这个对象的成员变量,但是每个线程都会获得局部变量的一份拷贝。
下面的图说明了上面描述的内容:
在上图中,两个线程有一系列的局部变量。其中一个局部变量(Local Variable 2)指向了堆内存上的共享对象(Object3)。这两个线程分别拥有一个指向同一个对像的引用,这两个引用是不同的。这些引用是局部变量,所以存储在各自线程的栈空间中。而这两个不同的引用指向了堆上的同一个对象。
注意到共享对象(Object3)有两个引用指向了Object2和Object4,如图中箭头所示。通过Object3中的成员变量引用,这两个线程能够获取Object2和Object4。
The diagram also shows a local variable which point to two different objects on the heap. In this case the references point to two different objects (Object 1 and Object 5), not the same object. In theory both threads could access both Object 1 and Object 5, if both threads had references to both objects. But in the diagram above each thread only has a reference to one of the two objects.
上图中同样了展示了一个局部变量指向堆上两个不同的对象。指向不同对象(Object1, Object5)的引用不是同一个对象。理论上,如果这两个线程有指向这两个对象的引用,那么这两个线程都能够访问Object1和Object5。但是在上图中每个线程只有指向其中一个对象的引用。
那么,在Java代码中如何反应上面的内存图呢?代码很简单,如下所示:
1 | public class MyRunnable implements Runnable() { |
1 | public class MySharedObject { |
如果两个线程执行run()方法,那么此代码就表明了上图所示的内存分布。run方法首先调用methodOne方法,然后methodOne方法调用methodTwo方法。
在methodOne方法中定义了一个原型的局部变量localVariable1,同时定义了一个指向对象的引用localVariable2.
每个执行methodOne方法的线程都会在各自的线程栈空间中创建localVariable1和localVariable2的副本。localVariable1在两个线程中是完全独立的,仅仅存在于对应线程的栈空间中。一个线程不能看到其他线程对于localVariable1变量的修改。
执行methodOne方法的线程同样会创建localVariable2的副本。但是,这两个localVariable2的副本是指向在堆上的同一个对象。而localVariable2引用指向的对象是一个类的静态成员变量。而类的静态变量在堆中只存在一份。所以两个localVariable2指向同一个MySharedObject对象的实例,MySharedObject实例存储在堆内存上,对应图中的Object3对象。
我们在来看看methodTwo方法是如何创建localVariable1局部变量的。这个局部变量是指向一个Integer对象的引用。methodTwo方法每次都重新创建一个Integer实例。局部变量localVariable2引用是存储在对应的栈空间中,而对应的Integer对象是在堆内存中,因为每次执行methodTwo方法都会创建Integer对象,所以在堆内存中会有两个Integer对象。即对应于图中的Object1和Object5对象。
在MySharedObject中有两个long类型的局部变量,因为这些变量是类的成员变量,所以它们跟具体的对象一起保存在堆内存中。仅仅局部变量是保存在栈内存中的。
硬件内存跟Java内存模型是有些不一样的。为了更好地理解Java内存模型,我们需要了解硬件内存架构。下面的图简单描述了现代计算机的硬件架构:
现代的计算机通常会有两个以上的CPU,同时这些CPU可能有多个核。这也意味着我们可以将多个线程同时运行在多个CPU上。在给定的时间点,每个CPU都能运行一个线程。即如果你的Java程序是多线程的,那么能够在多个CPU上同时运行(并行)。
在上图中我们可以看到每个CPU内部都有一系列的寄存器。CPU在寄存器上执行的操作要比在主存中的操作快。这是因为CPU能够更快的获取寄存器上的内容。
每个CPU都有一个对应的缓存内存。获取缓存中的内容要快于获取主存中的内容,但是没有获取寄存器中的内容快。CPU缓存的速度要介于寄存器和主存之间。有些CPU可能会有多级的缓存。
一个计算机包含一个主存(RAM),所有的CPU都能够获取主存中的内容。主存的容量要远远大于缓存的容量。
如果一个CPU要读取主存的内容,通常只会读取主存中部分区域的内容到CPU缓存中,然后在从缓存读取到寄存器中,之后进行计算。当CPU需要写结果到主存中,它会将寄存机中的值刷新到缓存中,然后在之后的某个时间点,在将缓存中的内容刷新到主存中。
当CPU需要存储缓存中的值时,会将缓存中的值刷新到主存中。同时CPU缓存也可以局部的刷新缓存值以及写出缓存值。
像前面说明的,Java内存模型和硬件内存架构是不同的。硬件内存架构不会区分线程栈和堆空间。在硬件中,线程栈和堆空间都是在主存中的。同时,部分线程栈和堆内存会出现在CPU缓存或者CPU寄存器中。下面的图进行了说明:
当对象和变量存储在不同的内存区域时,会出现一些问题。主要的两个问题如下:
下面会依次介绍这两个问题。
如果两个线程共享一个对象,但是没有采用合适的volatile关键字或者同步操作,那么当一个线程更新共享对象时,可能另一个线程并不能看到更新后的值。
想象一下,一个共享对象最初是在主存中,一个CPU上的线程读取了此对象到CPU缓存中,之后对于这个共享对象进行了修改。只要CPU缓存没有刷新到主存中,那么运行在其他CPU上的线程是不能读取到修改后的共享变量的值的。这会造成其他CPU只能看到修改之间的值。
下面的图说明了这种情况。在左边的CPU上运行的线程将共享对象读取到CPU缓存中,然后将它的count变量修改为2.由于没有将count的修改刷新到主存中,所以在右侧CPU上运行的线程不能看到这个修改。
为了解决这个问题,我们可以使用Java中的volatile关键字.volatile关键字的作用是保证每次都是从主存中读取变量的值,同时如果修改了此变量,那么此变量会立刻写回到主存中。
如果两个线程或多个线程共享一个对象,那么当多余一个线程修改此变量时,竞态条件就可能会出现。
想象一下,线程A将一个共享对象的count变量读取进CPU缓存,线程B也将count读取到另一个CPU缓存。现在线程A在count加上1,同时线程B也在count上加上1.现在变量被增加了两次,在每个CPU缓存中各一次。
如果加1操作是顺序进行的,那么count的值会加上2,然后写回到主存中。
但是,这两个操作是在没有正确同步的情况下同时进行的。尽管线程A或者B会将count修改后的值写回到主存,但是更新后的值始终比原来的值大1.
下面的图说明了上面描述的竞态条件:
为了解决这个问题,可以使用Java的同步块,即synchronized关键字。同步块保证了在同步块中获取到的变量都是从主存中读取的,并且当线程离开同步块时,所有修改的变量会被刷新到主存中,而不管这个变量是否被volatile声明。