个人博客


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

linux有关网络操作的11个命令

发表于 2024-06-25 | 分类于 Linux

下面的11个命令在进行linux网络操作很有用,特意记录下.

原文链接:https://github.com/oldratlee/translations/blob/master/how-to-work-with-network-from-linux-terminal/README.md

curl & wget

curl和wget都可以下载文件

'编写可读代码的艺术'读书笔记

发表于 2024-06-10

最近读了《the art of readable code》这本书,书的内容是叫你如何写出可读性高的代码。觉得里面的很多例子和观点有很大的参考价值,所以在这篇博客中记录下来。

书中有个很强的观点,代码是给人看的,不是给机器看的。所以写代码要将可读性放到第一位。写代码时要时刻考虑这段代码别人是不是容易阅读。
全书分为四部分,分别为:

  • 代码外观上的改进
  • 简化循环和代码逻辑
  • 重新组织代码
  • 单元测试可读性

代码外观上的改进

这一部分主要是从变量命名、函数命名、注释等方面介绍如何提升代码可读性。

命名和注释

“代码中最困难的两件事情,是命名和缓存失效”。可见命名的困难性。在选择名称时,我们遵循如下原则:

  • 选择能够准确表达代码意图的名称
    1
    2
    3
    // GetPage函数不能表示page从哪里获取。推荐使用FetchPage和DownloadPage进行替换。
    def GetPage(url):
    ...
    下面展示了英文中单词对应的同义词
单词 同义词
send deliver,dispatch,announce,distribute,route
find search,extract,locate,recover
start launch,create,begin,open
make add, push, enqueue
  • 避免使用泛化的名称,比如tmp、retval之类的东西。
    但在一些特殊情况下可以使用,比如循环、交换两个数时的临时变量中
  • 名称中附带额外信息
    下面展示了一些具体的例子
函数参数 推荐重构后的内容
Start(int delay) delay -> delay_secs
CreateCache(int size) size -> size_mb
ThrottleDownload(float limit) limit -M max_kbps
  • 根据变量的作用域选择合适的名称
    变量的作用域如果比较长,则变量名称尽量多携带信息;变量作用域如果比较短,则变量名称推荐用简单的、简短的。

  • 英文中一些推荐变量命名做法

    • 使用min和max表示下限和上限(包含)
    • 使用first和last表示范围,其中last包含最后一个数
    • 使用begin和end表示范围,其中end是最后一个数的下一个数
  • 布尔命名中避免携带否定词
    disable_ssl就没有use_ssl 好
  • 代码需要有段落
  • 如果代码表达的意思已经很明确了,那么没有必要添加注释

简化循环和代码逻辑

  • 尽早从函数返回(卫语句使用)
  • 避免使用do-while循环语句
  • 减少代码中的嵌套(比如可以使用提前返回)
  • 代码中的巨大表达式要进行简化
  • 代码中使用的变量越多,程序阅读起来就越困难
    1
    2
    3
    下面的now完全没有必要,直接用后面的内容替换now就行
    now = datetime.datetime.now()
    root_message.last_view_time = now
  • 尽量降低变量的作用域。(全局变量越少越好,其实就是降低耦合度)
  • 变量应该在需要的时候进行定义,而不是在函数的开头就定义当前函数中需要的所有变量
  • 变量改变的越多,追踪变量的当前值就越困难(尽量使用只写一次的变量,比如java中的final,go中的const等)

重新组织代码

  • 首先用自然语言描述代码需要完成的功能,然后在进行实现;这个方法在重构代码时也非常有用。
  • 尽量创建通用的代码,与项目无关的代码尽量放到独立的模块中,比如util模块中。
  • 代码应该只做一件事情(do only one task at a time);借鉴:unix哲学中,一个函数只做一件事。
  • 对于用到的语言,第三方库要尽量掌握,并熟悉常用的函数。

单元测试可读性

  • 测试代码提高可读性要遵循一个原则是:对用户隐藏不重要的细节,将重要的细节进行展示。
  • 测试的输入内容应该是完全覆盖被测代码,并且在满足这一前提下是最简单的
  • 好的代码是更加容易测试的
  • 在测试代码中,可以使用帮助函数来简化测试代码。

运行kubectl run会发生什么(what happens when i type kubectl run)

发表于 2024-04-17 | 分类于 k8s

原文链接:https://github.com/jamiehannaford/what-happens-when-k8s

想像一下当你运行下面的命令时

1
kubectl create deployment nginx --image=nginx --replica=3

在一切顺利的情况下,在k8s集群中能够看到生成了3个pod。那么在底层到底发生了什么?本篇文章试图解决一个请求从客户端到kubelet整体的流程,并且也链接了源码。

目录

  1. kubectl
    • Validation and generators(验证和生成)
    • API groups and version negotiation(API组和版本协商)
    • Client auth(客户端认证)
  2. kube-apiserver
    • Authentication(认证)
    • Authorization(鉴权)
    • Admission control(admission控制)
  3. etcd
  4. Initializers
  5. Control loops
    • Deployments controller
    • ReplicaSet controller
    • Informers
    • Scheduler
  6. kubelet
    • Pod sync
    • CRI and pause containers
    • CNI and pod networking
    • Inter-host networking
    • Container startup
  7. Wrap-up(总结)

kubectl

Validation and generators

针对开头提到的命令,在命令行输入回车后,会发生什么呢?
kubectl首先会进行客户端侧的验证。针对非法的命令(比如不支持的资源或镜像不合法)能够快速失败提示,避免发送到kube-apiserver,从而降低k8s负载。
经过验证后,kubectl组装将要发送到api-server的http请求。在k8s中,获取或者改变k8s中的状态都要通过apiserver,apiserver负责与etcd交互。kubectl使用generators来构建http请求,generators是负责序列化的一种抽象(注:可以简单的把generators理解为帮帮助用户构建完成的资源的工具,比如kubectl create中我们只指定了部分的内容,剩下的内容是generators帮助我们生成的)。
kubectl run 命令可以通过使用—generator参数指定运行多种资源,而不仅仅是deployments。如果不使用—generator参数,kubectl可以主动推断要运行的资源。
例如,参数--restart-policy=Always会触发deployments,而--restrart-policy=Never会触发pods。kubectl也会根据其他的参数确定需要执行的其他动作。比如记录命令(回滚或审计),或者--dry-run
在确定要创建的资源是Deployment后,kubectl将会使用DeploymnetAppsV1 generator来根据命令行参数构造runtime object。runtime object是k8s resource的通用表示。

API groups and version negotiation

k8s使用版本API,并且每个版本有自己所属的API groups。API groups意味着一些相似资源的集合。这比使用单调递增的版本号更加灵活。比如deployment的api group是apps,最近的版本是v1。这也是在deployment中写apiVersion: apps/v1的原因。
kubectl生成了runtime object后,kubectl开始找到正确的API group和version,并且会组装versioned client。versioned client能够意识到资源的各种REST 语义。这个阶段被称为版本协商,它涉及kubectl扫描远程API上的/apis路径,从而检索所有可能的API组。由于kube-apiserver在此路径上公开了其模式文档(以OpenAPI格式),因此客户端可以很容易执行自己的发现。
为了提高性能,kubectl会将OpenAPI schema缓存到 ~/.kube/cache/discovery 目录中。如果想要看到API 发现的过程,那么可以将这个缓存目录删除,然后运行kubectl命令时带上-v参数。你将会看到查询API versions的所有http请求。
kubectl最后一步是发送http请求。一旦接收到成功的响应后,那么kubectl就会按照期望的格式打印结果。

Client auth

为了成功发送请求,kubectl需要能够处理认证信息。用户的凭证一般存储在kubeconfig文件中,但是有时候也在其他地方。kubectl做了下面的事情来决定用户凭证:

  • 如果kubectl中使用了--kubeconfig,那么就使用这个参数指定的文件
  • 如果当前环境变量中定义了$KUBECONFIG,那么使用这个环境变量指定的文件
  • 否则使用推荐的主目录,比如~/.kube,然后使用此目录中的第一个文件
    当kubectl解析了凭证文件后,kubectl会决定当前需要使用的上下文,当前需要连接的集群,与当前用户关联的认证信息。如果kubectl中提供了指定的参数,比如--username, 那么这些参数将会覆盖凭证文件的配置。一旦有了这些信息,那么kubectl将会用这些信息去修饰http请求:
  • 使用tls.TLSConfig来发送x509证书
  • bearer token放到http header中的”Authorization”
  • username 和 password通过http basic authentication发送
  • OpenID认证流程是由用户事先手动处理的,生成一个类似于Bearer令牌的token,然后发送。

kube-apiserver

Authentication(认证,确定用户是谁)

k8s客户端和k8s系统组件与k8s交互主要是通过kube-apiserver来进行,比如获取和存储系统状态。kube-apiserver第一步要做的就是要验证请求的身份信息,这一步叫做认证(authentication).
apiserver如何进行认证呢?首先,apiserver在启动时,它会首先检查启动参数,然后组装一系列的认证器(authenticators)。举个例子,比如--client-ca-file传递进来,那么会添加x509认证器;如果提供--token-auth-file,那么会添加token认证器。apiserver收到的每个请求都会经过这些认证器,并且会依次进行认证,直到有一个认证器成功,那么请求就通过了。

  • x509 handler 将会检查请求的证书是否有效,并且是否在指定的ca文件中
  • bearer token handler 将会检查token(http Authorization header)是否在指定的文件中(由--token-auth-file指定)
  • basic handler仅仅简单的检查http 请求中的basic 认证信息是否与本地状态匹配
    如果每个认证器都认证失败了,那么一个组装的错误(每个错误的认证)会返回。如果认证成功,那么在http header中的Authorization会移除,并且用户信息会添加到上下文中。这样后续的apiserver处理逻辑(比如authorization和admission controllers)能够直接获取到用户信息。

Authorization(鉴权,确定用户是否有权限)

经过认证后,apiserver会进行鉴权检查,确定执行的请求动作是否允许。
apiserver处理鉴权跟认证类似。基于启动的参数,apiserver会组装一系列的鉴权器(authorizers), 这些鉴权器会依次检查请求。如果所有的鉴权都没有通过,那么请求会被拒绝并返回给客户端。如果其中某个鉴权器通过,那么请求会继续。
下面展示了一些鉴权器:

  • webhook,负责与k8s集群外的http(s) service进行交互的
  • ABAC,执行定义在static文件中的鉴权策略
  • RBAC,采用RBAC角色来进行鉴权
  • Node,确保node clients(比如kubelet)只能够访问托管到自身上的k8s资源。

    Admission control

    从apiserver的角度看,它已经确定了请求是谁以及请求是否能够执行。但是对于kubernetes,系统的其他部分对于能够做什么和不能做什么都有自己的想法。这就是admission controller要做的事情。
    鉴权侧重于用户是否有权限,而admission controllers是用来确保请求符合期望以及遵守集群规则。amission controller是对象存储到etcd之前的最后一个处理,因此它封装了系统剩余的检查以确定请求不会产生意想不到的后果。
    admission controllers的逻辑与authenticators和authorizers类似,但是不同之处是:不像authenticator和authorizers链式调用,如果一个admission controller失败,那么整个admission chain会失败。
    amission controllers比较优秀的设计是专注于提升扩展性。每个controller都作为一个插件放置到plugin/pkg/admission目录下,每个controller都满足一个比较小的接口定义。然后编译到k8s中。
    admission controllers通常分为这几类,资源管理、安全性、默认设置和引用一致性。下面列出了专注于资源管理的admission controller:
  • InitialResources:根据过去资源使用情况,设置默认的资源限制
  • LimitRanger:设置容器资源的request和limit或者为资源设置资源上界。
  • ResourceQuota:确保集群中资源使用量不会超过配额

etcd

apiserver验证了请求后,下一步apiserver反序列化http请求,从http请求中构建runtime objects(类似于kubectl generators的反过程),然后将objects存储到底层存储中。将这个过程细化如下。
首先,apiserver在接受到请求后是怎么知道如何处理呢?这个过程比较复杂。我们首先看下apiserver在启动时都会做什么:

  1. 当前apiserver启动时,它会创建server chain,是一个组合的调用链。基本上是由多个apiserver组成的。
  2. server chain中有个通用的apiserver,是一个默认的实现
  3. openAPI schema会填充到apiserver的配置中
  4. apiserver遍历所有在openAPI schema中定义的api groups,然后为每个api group都配置一个storage provider,这些storage provider是一个通用的存储抽象。apiserver主要通过与这些storage provider交互来存储和检索数据。
  5. 在每个api group中,apiserver会遍历所有的版本,然后为每个http 路径配置rest mapping。这样apiserver就能够将收到的外部请求与对应的处理逻辑关联起来
  6. 针对我们上面的例子,apiserver已经注册了post handler。这个handler被委托用来创建资源
    到这里,apiserver已经知道有哪些http route存在,并且如何将handler跟route进行映射。让我们现在看下http 请求进来后的处理
  7. 如果请求跟某个特定的route匹配上,那么就会交给对应的handler进行处理。如果没有匹配上,那么会采用基于url 路径的方式进行匹配。如果针对这个路径也没有匹配上,那么一个not found handler会被调用,然后返回404错误。
  8. 针对我们的例子,有一个注册的路由会调用createHandler方法。它首先解码http请求,执行基础的验证。比如确保http提供的json格式复合对应的版本资源。
  9. 触发审计以及最后的admission
  10. 专用的storage provider将资源存储到etcd中。etcd中资源的key通常是<namespace>/<name>,但是这个格式是可以配置的
  11. 创建中的任何错误都会被捕获,最后storage provider会执行get请求来确保资源一定是创建成功的。如果需要,还会调用而外的后处理逻辑。
  12. 构造http 响应并返回
    通过上面的步骤,我们知道apiserver做了很多的步骤。当前deployment已经成功的在etcd中创建了。但是当前deployment还没有被调度

    Initializers

    在object存储到etcd之后,apiserver或者调度器并不能立马看到它。直到一系列的initializers执行完毕后,才会看到这个对象。初始化控制器将资源类型和要执行的逻辑进行关联。如果一个资源没有注册初始化控制器,那么初始化阶段会调用,这个资源立马会被apiserver或调度器看到。
    初始化控制器给我们提供了扩展k8s的能力,比如:
  • 给pod注入代理sidecar 容器,或者获取特定的注解
  • 向特定的命名空间下的pod注入证书
  • 阻止创建小于20个字符的secret小于
    initializerConfiguration允许你声明哪些资源运行哪些initializers。假设我们想要一个在每个pod创建时都运行的initializer,我们的配置如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    apiVersion: admissionregistration.k8s.io/v1alpha1
    kind: InitializerConfiguration
    metadata:
    name: custom-pod-initializer
    initializers:
    - name: podimage.example.com
    rules:
    - apiGroups:
    - ""
    apiVersions:
    - v1
    resources:
    - pods
    上面的InitializerConfiguration会给每个pod的metadata.initializers.pending字段添加podimage.example.com。initializer controller已经提前部署好,initializer会扫描每个新建的pod。当发现pod的metadata.initializers.pending字段不为空时,initializer controller就会执行。执行完后,会将对应的podimage.example.com删除。当所有的初始化器执行完毕并且pending字段为空,则此对象被认为已经初始化完成。
    可能你已经发现一个问题,如果资源对于apiserver不可见,那么我们这里的初始化控制器怎么处理呢?apiserver暴露了一个includeUninitialized查询参数来返回所有的对象,包括那些还未初始化的对象。

Control loops

Deployments controller

到现在这个阶段,deployment已经存储到etcd中并且初始化逻辑已经完成了。接下来就要设置资源拓扑。deployment就是replicasets的集合,而replicaSet就是pod的集合。k8s如何从http 请求中去创建这些对象呢?这就是k8s内置controller需要做的内容。
k8s在整个系统中大量使用了”控制器”思想。控制器就是一个异步过程,调谐k8s当前的状态到期望状态。每个控制器有自己的职责,并且由kube-controller-manager组件负责并发运行。下面介绍deployment controller
在 deployment存储到etcd并且初始化完成后,那么deployment就能够被kube-apiserver看到。deployment controller能够检测到可用的deployment资源以及对于资源的变更。在我们的例子中,deployment controller通过informer为创建事件注册了一个特定的回调。
当deployment可用时,handler开始执行,将对象添加到内部的工作队列中。当开始处理这个对象时,控制器会检查deployment,发现deployment没有关联的ReplicaSet或者Pod记录。这是通过使用标签选择器来查询kube-apiserver来完成的。有趣的是,这个同步过程是状态未知的:调谐新的记录跟调谐旧的记录是一样的工作方式。
在意识到相关的ReplcaSet和Pod不存在后,deployment controller会开启scaling process来解析状态。创建一个ReplicaSet资源,给它分配标签选择器,然后给定数字1的版本号。ReplicaSet中的PodSpec是从deployment清单中拷贝而来,类似于其他相关的元数据。有些时候,deployment记录在这个过程后也需要对应的更新(比如,设定了截止日期)
更新deployment中的status字段,然后控制器重新进行调谐过程,然后等待deployment达到期望的状态。由于deployment controller仅仅关心创建ReplicaSets,接下来的工作需要由下一个控制器来完成,既ReplicaSet controller。

ReplicaSet controller

在前面的步骤中,Deployments controller创建了ReplicaSet,但是还没有创建Pods。这就是ReplicaSet 控制器发挥作用的时刻。ReplicaSet controller的主要工作就是监视ReplicaSet的生命周期和对应的Pods资源。类似于大多数其他的控制器,它通过在特定事件上触发handler来实现。
我们感兴趣的事件是创建。当ReplicaSet被创建后。ReplicaSet controller检查新的replicaSet的状态,然后发现当前的状态跟期望的状态不一致。它试图通过增加属于ReplicaSet的Pod数量来达到目标状态。它以谨慎的方式来创建它们,确保ReplicaSet的数量始终匹配。
创建Pods的过程是批量处理的,以SlowStartInitiaBatchSize开始,然后每次都以两倍的数量来启动其他的Pods。这样做的目的是确保当pod创建失败时,避免大量的http 请求到apiserver。如果要失败,可以采用对组件影响最小的方式来进行。
k8s通过Owner References(child资源中的一个字段,用来指向父资源的id)来确保对象的引用关系。这样做,不仅能够确保一旦父资源被删除,子资源也会被删除(级联删除),同时还能够保证父资源不会竞争子资源(想象一下,两个潜在的父资源认为他们拥有同样的子资源)
Owner Reference另一个好处是,它是有状态的。如果控制器重启了,那么重启过程中,不会影响到这些资源,因为资源拓扑是独立于controler的。这种对隔离的关注也渗透到控制器本身的设计中:它们不应该操作它们没有明确拥有的资源。相反,控制器应该只操作它们明确拥有的资源。
然而,有时候会存在”孤儿”资源,比如下面这些情况:

  1. 父资源删除,但是子资源没有删除
  2. 垃圾回收策略禁止删除子资源
    如果发生了这种情况,那么控制器会确保这些“孤儿”资源被新的父资源领养。多个父资源能够竞争性的去领养孤儿资源,但是只有一个能够成功。

    Informers

    rbac authorizer 或者 deployment controller需要获取集群状态,从而实现相关逻辑。比如以rbac authorizer为例,当请求进入时,authenticator需要将用户的初始状态保存起来以供后续使用。rbac authorizer后续会使用用户的初始表示来获取在etcd中相关的角色和角色绑定关系。控制器应该如何访问和修改这些资源?k8s中提供了informers来实现。
    informer是一种模式,既允许控制器订阅存储时间以及查询资源列表。informer除了提供一种抽象外,它还封装了大量的工作,比如缓存(缓存很重要,不仅可以减少apiserver连接数,同时还可以较少服务端和客户端的序列化)。informer还提供了线程安全的方式来操作资源。

    Scheduler

    在所有的控制器都运行后,当前在etcd中会存在一个deployment,一个ReplicaSet以及三个pod资源。此时,pod的状态是Pending,因为它们还没有被调度。最后的一个控制器是scheduler,它负责调度pod。
    scheduler是作为控制面的一个标准组件,它的运行机制跟其他的控制器类似。它监听事件,然后尝试将状态调整到期望状态。在这个例子中,调调度器筛选出NodeName为空的pod,然后尝试将它们调度到合适的节点上。
    为了找到合适的节点,scheduler使用了专门的调度算法。默认的调度算法工作如下:
  3. scheduler启动时,会注册一系列的默认predicates。这些predicates会检查pod是否满足特定的条件,比如:如果PodSpec中明确指定了CPU或RAM资源,如果Node上的资源不满足,那么此Node就会被过滤掉。
  4. 一旦选择了一些符合条件的节点,那么接下来priority函数将会给这些节点打分。比如:为了在集群中使得资源均匀分配,那么节点剩余资源较多的节点会得到更高的分数,最终选择一个分数最高的节点作为目标节点。
    当找到合适节点后,scheduler会创建一个Binding Object,其中名称和UID会匹配对应的pod。Binding Object的ObjectReference字段会包含选择的目标节点。通过post请求会发送给apiserver。
    当apiserver接受到Binding object后,它会反序列化请求,然后将信息更新到pod对象上:设置pod的NodeName字段,添加相关的注解以及将PodScheduled设置为True.
    一旦scheudler将pod调度到目标节点后,剩下的工作就交与kubelet来完成了。

    注:定制调度器。通过--policy-config-file可以定制predicate和priority函数。我们可以在k8s集群中运行特定的调度器,比如采用deployment运行。如果在PodSpec中指定了schedulerName,那么k8s会将调度过程交给这个调度器执行。

kubelet

Pod sync

让我们总结下上面的过程:

  1. HTTP请求经过认证、鉴权以及admission controller阶段;
  2. 一个deployment、一个replicaSet、三个pod持久化到etcd中
  3. 一系列初始化器运行
  4. 最后,每个pod被调度到合适的节点上
    到这个时候,数据仅仅存在于etcd中。接下来,就需要将pod在worker节点上启动。这个是通过kubelet组件来实现的。
    kubelet是k8s集群中运行在每个节点上的组件,主要负责管理pod的生命周期。这意味kubelet需要将pod转换成具体的容器、网络等。同时,它也负责处理卷挂载、容器日志、垃圾收集以及其他重要的事情。
    另一种考虑kubelet的方式是将它看做是一个控制器。kubectl从kube-apiserver每个20秒钟(可以配置)查询pod信息,挑选出NodeName于当前节点名称一致的pod。然后它将pod与本地内部缓存中pod的信息进行比较,如果发现有差异就进行同步。下面来具体看下同步的过程是怎么样的:
  5. 如果pod是要进行创建,那么kubelet会注册一些metrics,这些metrics主要给prometheus用于追踪pod的延迟。
  6. kubectl生成PodStatus对象,PodStatus代表了Pod的当前状态(Phase)。pod的状态是pod生命周期的更高一级的抽象。比如Pending,Running,Succeeded,Failed和Unknown。产生这些状态是比较复杂的,让我们深挖一下:
    • 首先,一系列的PodSyncHandlers会顺序执行。每个handler都会检查pod是否应该在当前节点上。如果任意的handler判断pod不应该在当前节点上,那么pod的phase将会变成PodFailed,并且pod会从此节点上被剔除。比如在Job资源中设置了activeDeadlineSeconds,超过了设置的时间,那么pod就会被剔除。
    • 接下来,pod的phase会由初始化容器以及主容器的状态来决定。因为在这里容器还没有启动,这些容器会被归类为等待。具有等待容器的pod的phase都是Pending
    • 最后,Pod Condition会由容器的Condition来决定。因为我们的容器当前还没有被容器运行时创建,所以kubelet会将PodReady condition设置为False
  7. 当PodStatus产生后,PodStatus对象会被发送到Pod的status manager中。该管理器的任务是通过apiserver异步更新etcd记录
  8. 接下来,将运行一系列准入处理程序,以确保pod具有正确的安全权限。这包括强制的AppArmor profiles 和 NO_NEW_PRIVS。在这个阶段被拒绝的pod将无限期地处于挂起状态。
  9. 如果kubelet在启动时指定了cgroups-per-qos参数,那么kubelet将会为pod创建cgroups并且会应用这些资源参数。这是为了实现对pod更好地服务质量(QoS)处理。
  10. 给pod创建数据目录。包括pod目录(通常是:/var/run/kubelet/pods/<podID>),卷目录(<podDir>/volumes)以及插件目录(<podDir>/plugins)
  11. 卷管理器(volume manager)等待Spec.Volumes中定义的卷准备好。根据挂载卷的类型,有些pod需要等待更长的时间(比如云或NFS卷)。
  12. 在Spec.ImagePullSecrets中定义的secrets资源,会从apiserver中获取。从而后续会注入到容器中
  13. 容器运行时开始启动容器

    CRI and pause containers

    到现在,大多数工作已经完成了,容器已经准备好启动了。启动容器的部分叫做容器运行时(Container Runtime,比如docker或rkt)。
    为了增加扩展性,kubelet自从v1.5.0版本以来,提出了CRI(Container Runtime Interface),CRI用于与具体的容器运行时进行交互。总的来说,CRI是一个抽象层。kubelet和容器运行时通过protocal buffers以及对应的gRPC API接口进行交互。CRI带来了很大的好处,如果后续替换下层的容器运行时,核心的k8s代码不需要做任何修改。
    我们回到部署容器的过程。当pod启动后,kubelet通过RPC启动RunPodSandbox。沙箱是一个CRI术语,用来描述一组容器,在k8s中被称作pod。
    在我们的例子中,我们使用了docker。在沙箱中首先创建一个”pause”容器。pause容器主要是为了给pod中其他的容器提供服务,提供必要的资源。这些资源包括linux namespace(IPC、网络、PID等)。如果你不熟悉linux中容器如何工作。我们快速说一下。linux内核有命名空间的概念,命名空间主要是隔离专有的资源,比如CPU、内存。然后将这些资源指定给特定的进程,从而保证资源只能被指定的进程使用。Cgroups主要是给linux提供了资源分配的方式。Docker利用了linux命名空间和Cgroups来实现容器。
    pause容器提供了所有的命名空间,并且允许子容器共享他们。作为同一个网络命名空间的一部分,一个好处是同一个pod中的容器可以通过localhost互相引用。pause容器的第二个角色是,PID命名空间如何工作。进程形成了等级树,最顶层的init进程,负责收割死进程。在pause容器创建后,它被检查点指向磁盘,然后启动。

    CNI and pod networking

    pod当前具有了基本的内容:pause容器持有所有的命名空间,从而pod内部的容器可以交流。但是网络如何工作以及如何设置?
    kubelet将设置网络的工作委托给了CNI(Container Network Interface)插件。CNI的工作方式类似于Container Runtime Interface。简单来说,CNI提供了一种抽象,用于给不同的网络提供者使用不同的网络实现。kubelet与这些注册的CNI插件通过流式json数据进行交流。下面展示了json配置的例子:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    {
    "cniVersion": "0.3.1",
    "name": "bridge",
    "type":"bridge",
    "bridge": "cnio0",
    "isGateway": true,
    "isMasq": true,
    "ipam": {
    "type": "host-local",
    "ranges": [
    [{"subnet": "${POD_CIDR}"}]
    ],
    "routes": [{"dst": "0.0.0.0/0"}]
    }
    }
    json也为pod提供了特定的额外的元数据,比如通过CNI_ARGS环境变量提供名称和命名空间。
    接下来发生的事情与具体的CNI插件有关,我们看看bridge CNI插件的工作:
  14. 插件首先在root命名空间中设置一个本地的linux bridge,这个bridge为当前host(本机)上的所有容器服务
  15. 接下来,插件创建veth pair,其中一端在pause容器,另一端在bridge中。可以将veth pair想象成一个管道,一端连接到容器,另一端连接到root 网络命名空间,从而允许容器和root网络命名空间进行通信。
  16. 现在应该给pause容器分配IP地址,并且设置路由。这样做之后pod就有了自己的ip地址。ip地址分配是委托给IPAM,IPAM在json中定义。
    • IPAM插件类似于主要的网络插件:通过二进制调用并且有标准化的接口。每种IPAM插件都必须去顶容器的IP/子网,网络和路由,然后将这些信息返回给CNI插件。最常用的IPAM插件是host-local,它从一个预先定义好的地址范围中分配IP地址。它将已经分配的ip地址存储在本地的文件系统中,从而确保在本机上不会重复使用ip地址。
  17. kubelet会指定一个内部的DNS服务器ip地址给CNI插件,保证容器的resolv.conf文件被正确设置。
    一定这个过程完成,CNI插件会返回json数据给kubelet,表明网络设置的结果。

    Inter-host networking

    我们现在仅仅说了容器如何同主机进行通信。但是如果pod在不同的主机上,那么pod之间如何通信呢?
    这通常是基于一种叫做overlay networking的技术来实现,这是一种跨多个主机动态同步路由的方法。一个流行的overlay netwoker实现是Flannel。Flannel的核心作用是在不同节点之间提供IPv4网络。Flannel不会控制容器如何跟主机通信(这是CNI的职责),而是负责流量在不同主机间的转发。为了实现这个功能,Flannel为主机选择了一个子网,将这个信息存储到etcd中。它保留集群路由的本地表示,并将传出的数据包封装在UDP数据报中,确保它到达正确的主机。

    Container startup

    所有的网络工作都完成了,接下来就是启动工作容器了。
    一旦沙箱完成初始化并且仍然是活跃的,那么kubelet将会为其创建工作容器。kubelet首先启动定义在PodSpec中定义的初始化容器,然后启动主容器。主要过程如下:
  18. 拉取容器的镜像。PodSpec中定义的secrets用于私有仓库
  19. 通过CRI创建容器.填充ContainerConfig结构体(包括启动命令、镜像、标签、挂载、环境变量等),数据来自于PodSpec。然后通过protobufs协议发送给CRI插件。对于Docker来说,docker会反序列化请求,然后填充到自己的config结构体,然后发送给docker的后台进程。在这个过程中,它将一些元数据标签(如容器类型、日志路径、沙盒id)应用到容器。
  20. 然后,它向CPU管理器注册容器,这是1.8中的一个新的alpha功能,通过使用UpdateContainerResources CRI方法将容器分配给本地节点上的CPU集。
  21. 容器启动
  22. 若定义了post-start hook,则执行这些hook。hooks可以是Exec(在容器中执行特定的命令)或Http(执行一个http请求)。如果PostStart hook需要很长时间运行、或者失败,那么容器将不会到达running状态

Wrap-up(总结)

到这里,已经产生了三个容器,他们可能运行在一个或者多个工作节点上。所有的网络、卷、secret都已经被kubelet填充,并且通过CRI插件填充到了容器中。

redis study

发表于 2024-04-01 | 分类于 redis

redis 数据结构

简单动态字符串

redis中字符串是自己定义的结构,名为SDS。不是用的c语言的字符串。
SDS的定义如下:

1
2
3
4
5
6
7
8
struct sdshdr {
// 记录buf数组中已使用字节的数量
int len;
// 记录buf数组中未使用字节的数量
int free;
// 字节数组,用于存储字符串;其中最后一位保存'\0'字符
char buf[];
}

为什么要自己实现,而不是复用c中对于字符串的定义?

  • 获取字符串长度,在O(1)复杂度获取
  • 杜绝缓冲区溢出。SDS在进行修改时,会检查SDS空间是否满足要求。若不满足,则会进行扩容。
  • 二进制安全,c中的字符串必须符合某种编码格式(比如ASCII),并且除了字符串的末尾之外,字符串中不能包含空字符。而sds使用len记录字符串长度,所以是二进制安全的。

SDS的扩容机制:

  • 空间预分配(若对sds进行修改,先将len设置为需要的空间大小)
    • 若sds中len小于1MB,则将free设置为跟len一样的大小
    • 若sds中len大于1MB,则将free设置为1MB
  • 惰性空间释放
    • 若释放sds中内容,则free中进行增加。实际占用的空间不释放;当然也提供了相应的api,在需要时,真正释放sds未使用空间。

链表

redis中链表结构
链表中节点定义

1
2
3
4
5
6
7
8
typedef struct listNode {
// 前置节点
struct listNode *prev;
// 后置节点
struct listNode *next;
// 节点值
void *value;
} listNode;

链表定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct list {
// 表头节点
listNode *head;
// 表尾节点
listNode *tail;
// 链表包含的节点数量
unsigned long len;
// 节点值复制函数
void *(*dup) (void *ptr);
// 节点值释放函数
void (*free) (void *ptr);
// 节点值对比函数
int (*match) (void *ptr, void *key);
} list;

注:链表节点使用 void*指针保存节点值,并且通过list中的dup、free、match三个属性为节点值设置类型特定函数,所以链表可以保存各种不同类型的值。

字典

跳跃表

整数集合

压缩列表

对象

Top 5 things every apache kafka developer should know

发表于 2023-12-09 | 分类于 kafka

top 5 things every apache kafka developer should know

翻译自:https://www.confluent.io/blog/top-5-things-every-apache-kafka-developer-should-know
介绍下面5个内容

  • 理解消息传递和持久化保证
  • 学习producer api中的新的粘性分区(learn about the new sticky partitioner in the producer API)
  • 利用cooperative rebalancing(协同重平衡)来避免消费者组执行rebalance时的stop the world
  • 掌握常用命令行工具
    • kafka console producer
    • kafka consule consumer
    • dump log
    • delete records
  • 使用record headers的能力
    • 为kafka记录增加headers
    • 检索headers

Tip1:理解消息传递和持久化保证

针对数据持久化,KafkaProducer提供了不同的配置。acks 配置指定了当生产者接受到多少消息确认后,才认为记录已经成功发送到broker上。kafka提供了以下三种选择:

  • none: 生产者不等待broker的确认,发送消息后就认为已经成功发送到broker上。
  • one: 生产者等待leader broker的确认(leader broker有一个),一定收到确认,就认为消息发送成功
  • all: 生产者需要等待所有的ISR(in-sync replicas) broker都确认消息后,才认为消息发送成功。
    如果需要更到的发送吞吐量,可以损失一定的数据,那么可以使用none或one。而如果应用不能容忍数据丢失,那么可以设置all,但是这样吞吐量会降低。
    这里需要说明下acks=all的情况。下面的场景描述中,producer都是使用acks=all来发送消息,并且topic副本数是3,一个leader,两个follower。
    情况1:如果这些副本中的记录偏移量是一致的,那么他们被认为是in-sync的。如下面所示,producer采用acks=all的情况: 情况2:假设由于某些情况(网络分区,负载过高等),导致两个follower没有跟上leader,那么follower就不是in sync的。此时生产者发送消息,那么实际的确认只会有一个。acks=all并不是指定有多少副本必须在in-sync。leader broker始终跟自己是同步的。

一般来说,设置acks=all, 我们的要求通常都是所有副本都应该确认,或者至少大量的in sync副本应该确认。如果不是这样,那么应该抛出异常知道所有副本都在in sync中。
为了满足这个要求,kafka提供了这样一个配置:min.insync.replicas. 这个配置强制指定多少个副本写成功才被认为真正写成功。需要注意的是,min.insync.replicas配置在是broker或者topic级别,而不是在producer上。min.insync.replicas默认值是1。所以为了避免上面说的情况,在三个副本的情况下,需要将min.insync.replicas设置为2。

上图中展示了in sync中的副本不满足min.insync.replicas要求的情况,此时producer发送的消息,leader broker不会将记录添加到log中,而是会抛出NotEnoughReplicasException 或者 NotEnoughReplicasAfterAppendException。副本与leader不一致被认为是一种可以重试的错误,所以producer会重试直到成功或者达到超时时间(默认值两分钟)delivery.timeout.ms。
如果需要非常高的数据持久化保证,那么应该同时设置min.insync.replicas和acks=all.

Tip 2: Learn about the new sticky partitioner in the producer API

kafka需要partition来提升吞吐量并且将消息均衡到不同的broker上。kafka的消息记录是key/value格式,其中key可以为null。kafka producer在发送消息时,不会立即发送,而是将消息放置到对应的partition batch中(类似缓存),待缓存满了,在一次发送。batch是一种增加网络利用的有效方式。在将消息发送到partition中,通常有三种方式来决定发送到哪个partition上。

  • 方式1:在发送消息时,直接指定消息对应的partition。这种情况,producer直接使用这个partition
  • 方式2:如果没有提供partition,消息包含key,那么producer会使用key的hash值来决定partition。
  • 方式3:如果既没有key也没有提供partition信息,那么kafka会使用round-robin的方式将消息发送到不同的partition中。producer会将第一个消息发送到partition 0,第二个消息发送到partition 1,以此类推。

下图展示了方式3:

round robin方法对于将消息均衡到不同的partition上工作的很好。但是存在一个缺点,由于producer是依次将消息发送到不同的partition batch中,那么有可能会出现每个partition中的batch都填充不满。比如下面展示的,topic有三个partition。假设应用产生了9条消息,并且消息没有key,所有的消息几乎是同时发送,如下图:

9条记录分散到是三个batch中,每个batch有三条。但是如果我们将9条消息放到一个batch中会更好。更少的batch使用更少的网络带宽并且对于broker的负载更小。
kafka 2.4.0新增了sticky partitioner approach. 这种方法能够将消息发送到一个partition的batch中直到此batch满了。然后,发送这个batch,sticky partitioner使用下一个partition的batch。如下图展示了使用sticky partitioner的例子:

通过使用sticky partitioner方法,我们减少了请求次数,同时也减少了请求队列上的负载,也减少了系统延迟。需要注意的时,sticky partitioner仍然是将消息均衡放置到不同的partition batch中。可以将这种认为是per-batch round robin 或者 eventually even approach。
如果想要更多了解sticky 模式,可以参考 Apache Kafka Producer Improvements with the Sticky Partitioner

Tip 3: Avoid “stop-the world” consumer group rebalances by using cooperative rebalancing

kafka是一个分布式系统,而分布式系统中一个重要的事情就是如何处理失败。kafka处理失败的方式之一是使用consumer group,consumer group管理多个consumer。如果其中一个consumer停止,kafka会进行rebalance从而确保另一个consumer能够接管这个工作。
从2.4版本开始,kafka引入了一个新的rebalance协议,cooperative rebalancing。在深入了解cooperative rebalancing之前,先来了解一下consumer group的基础。
假设一个分布式应用(比如一个微服务的多个副本)有个多个consumer,订阅同一个topic。这些consumer组成了一个consumer group,具有同样的.group.id。在consumer group中的每个consumer负责从一个或多个partition中消费消息。这些partition的分配是由consumer group中的leader进行的。如下图所示:

从图中可以看到,总共有6个partition,在理想的情况下,每个consumer负责消费两个partition。但是如果其中的某个应用失败了或者不能连接网络。那么对应的partition中的消息是不是就不能被消费直到应用恢复?幸运的是,由于consumer rebalancing协议的存在,不会发生这种情况。
下图展示了consumer group protocal过程:

如上图,consumer2由于某些原因失败了。group coordinator将它从组中移除然后触发rebalance。rebalance尝试将工作负载在组内所有工作的consumer上进行均衡分布。在这个例子中,consumer2离开了组,rebalance会将consumer2拥有的partition分配给组内其他的consumer。所以对于一个consumer group,如果其中consumer失败了, 那么对于这些partition的处理不会产生影响。
但是,默认的rebalance协议有个缺点。在rebalance过程中,每个consumer都会放弃之前获得的partition(这会造成consumer停止消费),知道topic下所有的partition都被重新分配。这种情况被称为stop the world rebalance。为了解决这个问题,依靠ConsumerPartitionAssignor实例,consumer简单的重新获取之前分配的partition,所以在这些partition上仍然能够继续消费。
上述描述的实现被称为eager rebalancing, 因为它优先考虑的是针对一个consumer group中,不会有两个consumer同时对于一个partition拥有主权。
虽然对于同一个topic下的某个partition不能具有相同的consumer非常重要,但是有一种更好的方法,既能够提供安全性同时还不会暂停处理,既incremental cooperative rebalancing。这个方法在kafka2.3版本的kafka connect中被首次引入,现在已经在consumer group 协议中实现了。利用cooperative 方法,消费者不会在rebalance开始时主动放弃partition的所有权。在cooperative方法中,consumer group中的所有成员会将当前的分配进行编码然后将信息发送到group leader中。group leader决定那个partition需要修改对应的consumer。而不是一开始就完全从新分配。之后第二次rebalance发起,但是这一次,仅仅涉及到那些需要改变所有权的分区。这有可能是撤销不在用的partition或者新增的partition。对于那些没有改变所有权的分区,这些分区中的数据会继续进行处理。
这种处理办法解决了stop-the-world,而仅仅是暂停了哪些需要修改所有权分区的消费。这带来了更少的rebalance代驾以及降低了完成rebalance的时间。即使rebalance时间很长也没有关系,因为现在数据仍然被处理。使用CooperativeStickyAssignor能够开启这个功能。
如果要开启这个功能,则需要将partition.assignment.strategy设置为使用CooperativeStickyAssignor。这种设置完全是在客户端测,所以仅仅更新客户端版本即可。而在Kafka Stream中,这个功能是默认开启的。

Tip 4:掌握命令行工具

下面介绍了4种在平时工作中使用最多的工具。

kafka console producer

1
2
3
4
# 开启发送者程序, 发送的消息只有value,没有key
kafka-console-producer --topic <topic> --broker-list <broker-host:port>
# 发送消息,发送的消息包含key 和 value
kafka-console-producer --topic <topic> --broker-list <broker-host:port> --property parse.key=true --property key.separator=":"

kafka console consumer

1
2
3
4
5
6
# 消费指定topic中的消息
kafka-console-consumer --topic <topic> --bootstrap-server <broker-host:port>
# 指定从开始的地方消费
kafka-console-consumer --topic <topic> --bootstrap-server <broker-host:port> --from-beginning
# 默认情况下consumer只会打印消息的value,如果想要打印消息的key,则输入下面命令
kafka-console-consumer --topic <topic> --bootstrap-server <broker-host:port> --property print.key=true --property key.separator=":"

Dump log

1
2
3
# 指定打印topic为example-0中的日志,参数--print-data-log表示输出日志
# 不过一般在生产环境中不会使用这个命令
kafka-dump-log --print-data-log --files ./var/lib/kafka/data/example-0/00000000000000000000.log

delete records

kafka提供了配置来控制数据保留,包括时间和数据大小

  • 数据保留的时间由 log.retention.hours 控制,默认值是168hour,也就是一周
  • configuration.log.retention.bytes 控制segment文件最大是多少。默认值是-1, 也就是不限制大小

如果想要删除数据,可以使用下述命令:

1
2
kafka-delete-records --bootstrap-server <broker-host:port> \
--offset-json-file offsets.json

offsets.json 文件内容如下:
1
2
3
4
5
6
{
"partitions": [
{"topic": "example", "partition": 0, "offset": -1}
],
"version":1
}

参数介绍如下:

  • topic:指定要删除数据对应的topic
  • partition:指定需要删除数据对应的partition
  • offset:指定从哪个offset开始删除,注:是删除offset之前的数据。-1表示删除当前HW之前的数据,HW(high watermark)表示能够开始消费的位置

Tip5:使用record headers的能力

Record headers可以给kafka消息添加一些元数据,并且不是给消息的key value添加额外的信息。比如如果你想要在消息中嵌入一些信息,如表示消息来源系统,也是是想要增加一些审计功能。
为什么不能将这些额外的数据添加到key中。因为给key中添加数据会带来两个潜在的问题

  1. 首先,如果你使用的是压缩主题,那么给key添加信息会使得消息不正确。这样压缩不会像之前起作用
  2. 其次,给key添加额外的信息有可能会影响数据的partition分布

回顾

我们了解了kafka的五个tips,我们理解了下面的知识点

  1. 消息持久性以及和消息传递之间的关系
  2. producer API中的sticky partitioner
  3. command line tools
  4. record headers的能力

java-zero-copy

发表于 2023-12-04 | 分类于 linux

本篇内容主要翻译自Efficient data transfer through zero copy,包括有些自己的思考

java zero copy

很多网页应用有大量的静态内容,针对这些静态内容主要的处理逻辑就是从磁盘读取数据,然后将数据写入到响应的socket中。这项工作应该需要较少的CPU资源。但是有时候并不是。内核从磁盘读取数据,然后将数据拷贝到应用中。之后应用将数据写入到内核,然后推送到socket中。实际上,应用程序在这里扮演了一个无效率的中间层,既将数据从磁盘写入到socket。
每一次当数据在用户空间和内核空间转换时,数据都会被拷贝,而这会消耗cpu cycles以及内存带宽。幸运的是,我们可以采用zero copy技术来避免内核和应用程序之间的数据拷贝。应用程序使用zero copy技术来请求内核直接将数据从磁盘文件拷贝到socket中,而不需要经过应用程序。zero copy技术能够极大的提升应用程序性能并且减少内核空间和用户空间之间的切换。
Java中使用java.nio.channels.FileChannel中的transferTo()方法在linux和Unix系统重实现zero copy。使用transferTo()方法能够直接将字节数据从一个channel写入到另一个channel中,而数据不需要经过应用程序。本篇文章首先展示使用传统的拷贝方法消耗的资源,然后展示使用zero copy获得的性能提升。

数据传输:传统方法

想想一个简单的场景,从一个文件读取数据,然后将数据通过网络写入到另一个程序中。核心操作如下所示。

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

虽然看起来很简单,但是这个操作需要再内核空间和用户空间4次上下文切换,以及4次数据拷贝。下图展示了具体过程

下图展示了上下文切换过程

主要的步骤如下:

  1. read()方法从user mode 转换到 kernel mode。在read内部是发一起了一次系统调用sys_read()从文件读取数据。第一次数据拷贝是由DMA引擎来执行,DMA从磁盘读取文件,然后将数据保存到内核缓冲区中。
  2. 请求的数据被从内核缓冲区拷贝到用户缓冲区,read()方法返回。这引起了第二次的上下文切换。现在数据是在用户空间中的缓冲区中。
  3. send()方法调用引起用户空间到内核空间的切换。这次会将数据从用户空间拷贝到内核缓冲区。这一次数据是放置到另外一个内核缓冲区中,与目的socket相关联。
  4. send()方法返回,又引起了一次上下文切换。DMA将数据从内核缓冲区拷贝到网卡缓冲区中,这是第四次数据拷贝。

我们为什么不直接将数据拷贝到用户空间,而要经过内核空间呢?首先因为用户进程没有办法直接读取硬盘,这是基于保护的目的,都要经过操作系统的处理,才能读取到硬盘内容;其次这是因为操作系统引入内核缓冲区是为了提升性能。操作系统读取数据都会预读取一些数据,这样在应用程序读取额外的数据时,可以不用发起系统调用从硬盘中读取,而是直接从内核缓冲区获取即可。而写入过程,可以完全实现为异步过程。既应用程序只需要将数据写入到内核缓冲区中,而不是写入到磁盘中。
但是,设想当前应用程序需要处理的数据要远远大于内核空间缓冲区的大小。而此时,数据需要在磁盘,内核缓冲区,用户空间中来回拷贝。这会严重影响性能。
Zero copy技术是解决这个问题的方法。

数据传输:zero copy方法

如果你仔细检查上面的过程,你会发现第二次和第三次数据拷贝可以省略。应用程序针对这些数据什么也不做。因此数据可以被直接从内核缓冲区拷贝到socket buffer中。transferTo()方法可以完成这个操作。下面展示了此方法

1
public void transferTo(long position, long count, WritableByteChannel target);

transferTo()方法将数据从文件channel拷贝到target channel中。这个方法依赖底层操作系统对于zero copy的支持。在UNIX或linux中,使用的是sendfile()系统调用。如下所示:
1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

下图展示了transferTo()方法执行过程

下图展示了具体的上下文切换

transferTo()方法执行过程如下:

  1. DMA引擎将文件内容拷贝到内核缓冲区。然后数据从内核缓冲区拷贝到socket buffer中。(涉及到两次数据拷贝)
  2. 第三次数据拷贝发生在DMA引擎将数据从socket buffer拷贝到网卡中。

我们将上下文切换从4次降低到2次,数据拷贝从4次降低到3次(其中仅有一个数据拷贝需要CPU参与,图中序号2)。但是这还没有达到zero copy的目的。如果底层网卡支持gather操作,那么我们可以减少内核空间中的数据重复。在linux kernel2.4及以后版本中,socket buffer已经支持了这个操作。这个方法不仅仅减少了上下文切换并且也消除了CPU参与的数据拷贝。具体如下:

  1. transferTo()方法将文件内容拷贝到内核缓冲区,由DMA引擎执行
  2. 不需要将数据拷贝进socket buffer中,仅仅将数据的位置以及数据长度添加到socket buffer中。DMA引擎直接将kernel buffer中的数据拷贝到网卡中。
    下图展示了包含gather操作的transferTo()

性能比较

使用java实现文件传输,采用传统的IO和nio来分别实现进行对比。完整代码参考.其中客户端是主要的视线,服务端仅仅读取数据。

传统IO client 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 1. create socket and connect to server
try {
socket = new Socket(Common.SERVER, port);
System.out.println("Connected with server " + socket.getInetAddress() + ":" + socket.getPort());
} catch (UnknownHostException e) {
System.out.println(e);
System.exit(Common.ERROR);
} catch (IOException e) {
System.out.println(e);
System.exit(Common.ERROR);
}

// 2. send data to server
try {
inputStream = Files.newInputStream(Paths.get(fileName));
output = new DataOutputStream(socket.getOutputStream());
long start = System.currentTimeMillis();
byte[] b = new byte[4096];
long read = 0;
long total = 0;
// read function cause user mode to kernel mode,
// and DMA engine read file content from disk to kernel buffer
// then copy kernel buffer to the b array. This cause another context switch
// then when read return, cause kernel mode to user mode
// Summary: two context switch, two copy(one cpu copy)
while ((read = inputStream.read(b)) >= 0) {
total = total + read;
System.out.println("total size:" + total);
// write function cause user mode to kernel mode,
// and copy data from b array to socket buffer,
// then DMA engine copy socket buffer to nic(network interface) buffer
// then when write return, cause kernel mode to user mode,
// Summary: two context switch, two copy(one cpu copy)
output.write(b);
}
System.out.println("bytes send: " + total + " and totalTime(ms):" + (System.currentTimeMillis() - start));
} catch (IOException e) {
System.out.println(e);
}

nio client 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void testSendfile() throws IOException  {
// 1. get file size(bytes)
Path path = Paths.get(fileName);
long fsize = Files.size(path);

SocketAddress sad = new InetSocketAddress(Common.SERVER, port);
SocketChannel sc = SocketChannel.open();
sc.connect(sad);
sc.configureBlocking(true);
FileInputStream fis = new FileInputStream(fileName);
FileChannel fc = fis.getChannel();
long start = System.currentTimeMillis();
long curnset = 0;

// in linux kernel 2.4 and later
// transferTo() function cause user mode to kernel mode
// DMA engine copy data from disk to kernel buffer
// then just copy data position and data length to kernel buffer
// then DMA engine copy kernel buffer to NIC buffer
// when transferTo return, cause another context switch
// Summary: two context switch, two copy(zero CPU copy)
curnset = fc.transferTo(0, fsize, sc);
System.out.println("total bytes transferred: " + curnset + " and time taken in MS: " +
(System.currentTimeMillis() - start) );

fc.close();
fis.close();
}

性能比较如下:

file size traditional(ms) nio(ms)
12MB 50 18
221MB 690 314
2.5G 15496 2610

评测环境:

  1. java : openjdk version “1.8.0_382”, OpenJDK Runtime Environment (build 1.8.0_382-b05), OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
  2. linux: CentOS Linux release 7.6 (Final)
  3. kernel: 4.14.0_1-0-0-51

Read/Write locks in java(java中的读写锁)

发表于 2019-05-17 | 分类于 java

本篇文章主要介绍读写锁的一些原理及实现。翻译原文地址

Java中的读写锁

假设一个java应用程序需要读以及写一些资源,但是写的频率要远远低于读。多个读线程读取资源不会有什么问题。但是如果一个线程想要写资源,那么同时就不能有其他线程读或写这个资源。为了能够允许多个读线程和一个写线程,我们需要读写锁。
虽然Java中提供了读写锁的实现,但是我们还是要知道读写锁背后的原理,这样才能在实际使用中处理具体的问题。

Java中实现读写锁

总结一下对于获取资源的读权限和写权限可以如下:
读权限:当没有其他线程写当前资源或者没有其他线程请求当前资源的写权限,那么当前线程就能够获取资源的读权限
写权限:如果没有其他线程读或者写当前资源,当前线程就能获取资源的写权限
只要没有其他线程正在写资源或者没有其他线程请求写资源,那么当前线程就能够读取资源。如果读线程发生的很多,但是又没有提升写线程的优先级,那么就可能发生”饥饿”现象。

可重入读写锁

可重入读锁

可重入写锁

读锁升级到写锁

写锁降级到读锁

可重入读写锁的完整实现

在finally中调用unlock

Thread Signaling

发表于 2019-05-14 | 分类于 java

线程信号量及wait,notify方法

本篇主要介绍线程之间如何进行信号的通知。同时介绍wait,notify底层的一些实现。

通过共享对象进行信号通知

最简单的进行线程之间通知的方式就是采用共享变量的方式。比如下面的代码

1
2
3
4
5
6
7
8
9
public class MySignal{
protected boolean hasDataToProcess = false;
public synchronized boolean hasDataToProcess(){
return this.hasDataToProcess;
}
public synchronized void setHasDataToProcess(boolean hasData){
this.hasDataToProcess = hasData;
}
}

线程A和线程B共享同一个MySingal实例,当线程A处理好数据后,可以设置hasDataToProcess属性为true,然后线程B获取到此属性。从而完成线程之间的信号通知。当然,如果线程A和线程B不是在同一个MySingal实例上进行的,则不能进行信号的传递。

忙等待

在采用MySingal的例子中,一般会采用下面的代码来判断一个线程是否可以进行处理了。

1
2
3
4
protected MySignal sharedSignal = new MySingal()
while(!sharedSignal.hasDataToProcess()){
//do nothing... busy waiting
}

从代码中可以看到,检测hasDataToProcess属性是在一个while循环中,如果hasDataToProcess为false,这就会造成线程一直在执行while语句,造成忙等待。这会造成CPU资源的浪费。

wait,notify,notifyAll使用

一般在Java中,我们一般会采用wait,notify或notifyAll进行线程之间信号的传递。线程调用某一个对象上的wait方法前,必须首先获取该对象上的锁,才能执行此对象上的wait方法。在调用notify或者notifyAll之前,也是要获取对应对象上的锁。调用wait方法时,会释放此对象上的锁,线程进入阻塞状态,等待信号。而调用notify后,会随机唤醒一个同对象上的线程,但是必须是退出了notify对应的synchronized块后,被唤醒的线程才能继续执行,因为被唤醒的线程还要获取对象上的锁。如下面的代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MonitorObject{
}
public class MyWaitNotify{
MonitorObject myMonitorObject = new MonitorObject();

public void doWait(){
synchronized(myMonitorObject){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
}

public void doNotify(){
synchronized(myMonitorObject){
myMonitorObject.notify();
}
}
}

从代码中,可以看到在myMonitorObject上调用wait方法之前,会先获取myMonitorObject上的锁。在调用notify方法之前,也是要先获取myMonitorObject上的锁。在下面的内容中我会简单介绍wait和notify的底层原理。

wait notify notifyAll在Object源码中的介绍

在JDK1.8中,Object对象中有三个wait(),wait(long timeout),wait(long timeout, int nanos)这三个方法,它们的主要区别是后两个方法增加了等待的时间。
下面是JDK1.8中关于wait(long timeout)方法的描述:

/**

 * Causes the current thread to wait until either another thread invokes the
 * {@link java.lang.Object#notify()} method or the
 * {@link java.lang.Object#notifyAll()} method for this object, or a
 * specified amount of time has elapsed.
 * <p>
 * The current thread must own this object's monitor.
 * <p>
 * This method causes the current thread (call it <var>T</var>) to
 * place itself in the wait set for this object and then to relinquish
 * any and all synchronization claims on this object. Thread <var>T</var>
 * becomes disabled for thread scheduling purposes and lies dormant
 * until one of four things happens:
 * <ul>
 * <li>Some other thread invokes the {@code notify} method for this
 * object and thread <var>T</var> happens to be arbitrarily chosen as
 * the thread to be awakened.
 * <li>Some other thread invokes the {@code notifyAll} method for this
 * object.
 * <li>Some other thread {@linkplain Thread#interrupt() interrupts}
 * thread <var>T</var>.
 * <li>The specified amount of real time has elapsed, more or less.  If
 * {@code timeout} is zero, however, then real time is not taken into
 * consideration and the thread simply waits until notified.
 * </ul>
 * The thread <var>T</var> is then removed from the wait set for this
 * object and re-enabled for thread scheduling. It then competes in the
 * usual manner with other threads for the right to synchronize on the
 * object; once it has gained control of the object, all its
 * synchronization claims on the object are restored to the status quo
 * ante - that is, to the situation as of the time that the {@code wait}
 * method was invoked. Thread <var>T</var> then returns from the
 * invocation of the {@code wait} method. Thus, on return from the
 * {@code wait} method, the synchronization state of the object and of
 * thread {@code T} is exactly as it was when the {@code wait} method
 * was invoked.
 * <p>
 * A thread can also wake up without being notified, interrupted, or
 * timing out, a so-called <i>spurious wakeup</i>.  While this will rarely
 * occur in practice, applications must guard against it by testing for
 * the condition that should have caused the thread to be awakened, and
 * continuing to wait if the condition is not satisfied.  In other words,
 * waits should always occur in loops, like this one:
 * <pre>
 *     synchronized (obj) {
 *         while (&lt;condition does not hold&gt;)
 *             obj.wait(timeout);
 *         ... // Perform action appropriate to condition
 *     }
 * </pre>
 * (For more information on this topic, see Section 3.2.3 in Doug Lea's
 * "Concurrent Programming in Java (Second Edition)" (Addison-Wesley,
 * 2000), or Item 50 in Joshua Bloch's "Effective Java Programming
 * Language Guide" (Addison-Wesley, 2001).
 *
 * <p>If the current thread is {@linkplain java.lang.Thread#interrupt()
 * interrupted} by any thread before or while it is waiting, then an
 * {@code InterruptedException} is thrown.  This exception is not
 * thrown until the lock status of this object has been restored as
 * described above.
 *
 * <p>
 * Note that the {@code wait} method, as it places the current thread
 * into the wait set for this object, unlocks only this object; any
 * other objects on which the current thread may be synchronized remain
 * locked while the thread waits.
 * <p>
 * This method should only be called by a thread that is the owner
 * of this object's monitor. See the {@code notify} method for a
 * description of the ways in which a thread can become the owner of
 * a monitor.
 *
 * @param      timeout   the maximum time to wait in milliseconds.
 * @throws  IllegalArgumentException      if the value of timeout is
 *               negative.
 * @throws  IllegalMonitorStateException  if the current thread is not
 *               the owner of the object's monitor.
 * @throws  InterruptedException if any thread interrupted the
 *             current thread before or while the current thread
 *             was waiting for a notification.  The <i>interrupted
 *             status</i> of the current thread is cleared when
 *             this exception is thrown.
 * @see        java.lang.Object#notify()
 * @see        java.lang.Object#notifyAll()
 */
public final native void wait(long timeout) throws InterruptedException;

从代码的注释中,我们可以获得以下一些内容:

  1. 调用wait的线程一定要获取对象上的monitor,在调用wait后,会释放该对象上的锁。
  2. 发生以下四种情况线程会被唤醒:
    • 其他的线程在相同的对象上调用了notify
    • 其他的线程在相同的对象上调用了notifyAll
    • 其他的线程终止了当前线程(interrupt方法),会扔出InterruptedException异常
    • 指定的最大等待时间到了
  3. 当线程被唤醒时,当前的线程会从对象的等待集合中移除,重新进入线程调度阶段。之后会跟其他线程竞争获取对象上的锁。
  4. 线程可能在没有上述四种情况发生的时候,被唤醒。这被称为伪唤醒,下面会有详细介绍。

浅析Object monitor的底层原理

在JVM实现获取对象上的锁,是通过monitor进行实现的。图示如下:

当一个线程需要获取 Object 的锁时,会被放入 EntrySet 中进行等待,如果该线程获取到了锁,成为当前锁的 owner。如果根据程序逻辑,一个已经获得了锁的线程缺少某些外部条件,而无法继续进行下去(例如生产者发现队列已满或者消费者发现队列为空),那么该线程可以通过调用 wait 方法将锁释放,进入 wait set 中阻塞进行等待,其它线程在这个时候有机会获得锁,去干其它的事情,从而使得之前不成立的外部条件成立,这样先前被阻塞的线程就可以重新进入 EntrySet 去竞争锁。

notify和notifyAll区别

乍一看,notify和notifyAll的区别很简单,就是notify只能随机选择一个处于等待状态的线程进行唤醒;而notifyAll可以唤醒所有处于等待状态下的线程,但是也是只有一个线程能够继续执行。
如果结合上面的图片,我们就能更好地进行理解。当线程在对象上调用notify方法时,随机选择一个处于等待状态的线程,并且把该线程放置在该对象的EntrySet列表中。而如果调用的是notifyAll方法时,所有的处于等待状态下的线程都会进入到EntrySet中,从而多个线程进行对象上的锁。notify有点类似于网络中的单播,而notifyAll类似于多播。

丢失信号

设想一下这种情况,有一个线程首先调用了notify方法,然后其他的线程调用了wait方法。那么处于wait下的线程将不会接受到之前线程发送的notify信号。如下面代码所示,代码地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class WaitNotifyMissSingal {
public static void main(String[] args) {
final Object lock = new Object();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread B is waiting to get lock");
synchronized (lock) {
System.out.println("thread B get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.notify();
System.out.println("thread B do notify method");
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread A is waiting to get lock");
synchronized (lock) {
try {
System.out.println("thread A get lock");
TimeUnit.SECONDS.sleep(1);
System.out.println("thread A do wait method");
lock.wait();
System.out.println("wait end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}

运行上面的代码,程序会一直运行下去。那么如何解决这个问题呢,其实在Object的wait的方法注释中也有对应的说明。我们可以把通知信号保存在信号类的成员变量中。代码地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyWaitNotify2{
MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;

public void doWait(){
synchronized(myMonitorObject){
if(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}

public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}

在上面的代码中,将信号保存在了wasSingalled变量中,只要调用了notify方法,就是将wasSignalled设置为true,表示有线程执行了notify。在doWait方法中,如果wasSignalled为false,则当前的线程会执行wait方法,进入等待状态;而当wasSignalled为true时,不会执行wait方法,只会将wasSignalled设置为false。

伪唤醒

因为一些底层操作系统的原因,具体的可以查看unix操作系统相关内容。简单理解,就是当线程没有接受到唤醒信号时,而线程被错误的唤醒。为了防止伪唤醒,一定要在while循环中检查信号变量的值。这样的循环也叫自旋锁。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyWaitNotify3{

MonitorObject myMonitorObject = new MonitorObject();
boolean wasSignalled = false;

public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}

public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}

不要在常量字符串或全局对象上调用wait方法

请看下面的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyWaitNotify{

String myMonitorObject = "";
boolean wasSignalled = false;

public void doWait(){
synchronized(myMonitorObject){
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}

public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}

从代码中我们可以看到,在上锁时是对myMonitorObject对象上锁,也是在myMonitorObject上调用wait方法,而myMonitorObject是一个空的字符串。我们知道,JVM会将相同的字符串看成同一个对象。也就说说,如果有两个MyWaitNotify实例,则这两个实例中的myMonitorObject是指向同一个对象的。那么一个在实例MyWaitNotify上调用wait的线程会被在另一个实例MyWaitNotify上调用doNotify方法的线程唤醒。下面的图表示了实例的成员指向了相同的对象。

在上图中,4个线程是在相同的String常量上调用wait和notify方法,但是信号wasSignalled仍然是单独存放在对应的实例对象上。即一个在MyWaitNotify1实例上调用doNotify方法的线程可能会唤醒在MyWaitNotify2实例上等待的线程,但是唤醒信号仍然是单独保存在MyWaitNotify1实例中。
如果仔细看上面的程序,发现当在第二个MyWaitNotify2实例上调用doNotify方法时,会唤醒线程A或者B,但是由于在while循环中的wasSignalled变量,对于MyWaitNotify1实例仍然是false。所以被唤醒的线程A或者B从wait启动,但是会再次进入while循环调用wait,再次进入阻塞状态。这跟伪唤醒很像。
由于在doNotify方法中调用的notify方法,此方法不像notifyAll方法,notify方法只会唤醒一个线程,如果是线程C调用的doNotify,本来想唤醒的是线程D。但是有可能会错误的唤醒线程A或B,并且线程A或B会修改对应实例上的wasSignalled变量。而发给D的信号就丢失了,就有点像信号丢失的情况。如果将doNotify方法中的notify替换成notifyAll就不会有这个问题。但是这是一个坏主意,当应用仅仅只需要唤醒一个线程时,没有任何理由要把所有的线程都唤醒。
注意,对于wait/notify的情况,永远不要使用全局的对象例如string常量。在wait和notify上使用的对象针对对应的实例必须是独一的。

参考

  1. http://tutorials.jenkov.com/java-concurrency/thread-signaling.html
  2. http://www.php.cn/java-article-410323.html

java synchronized keyword

发表于 2019-05-10 | 分类于 java

java synchronized关键字

在平常进行java并发程序的开发过程中,synchronized关键字的出现频率很高。但是synchronized底层是如何实现的,synchronized都有哪些具体的用法。本篇将会在下面进行详细讲解。
synchronized关键字主要是用来进行同步操作。synchronized关键字修饰的内容,每次都只能有一个线程进入,如果其他线程想要进入相同的代码块,那么必须等前一个线程释放代码块对应的锁,其他的线程才能进入此代码块。但是同一个线程能够多次进入一个相同的同步块,也就是synchronized具有可重入锁的特性。
总的来说,在java中,主要在三个地方使用synchronized关键字

  1. 类的实例方法
  2. 类的静态方法
  3. 修饰部分代码块

synchronized用在实例方法上

1
2
3
4
5
6
public class MyClassInstance {
private int count;
public synchronized void add(int value){
this.count += value;
}
}

在上面的代码中展示了,synchronized应用在实例方法的方式。当synchronized用在实例方法上时,每当线程进入此方法时,会尝试获取对应的类实例上的锁(注意是类实例)。如果没有其他的线程持有此实例上的锁,那么线程会获取此实例锁,然后运行此方法。
下面的图显示了使用javap进行反编译后的内容(运行:javap -v MyClassInstance.class):

图中仅仅展示了add方法反编译后的内容,可以看到在方法的flags标记中出现了ACC_SYNCHRONIZED。这个就是表明前面方法是用synchronized关键字修饰的。这个跟同步代码块的反编译结果是不同的。但是在线程执行同步操作时,都是要获取对应对象上的锁。

synchronized用在类的静态方法上

1
2
3
4
5
public class MyClassStatic {
public static synchronized void add(int value) {
System.out.print(value);
}
}

上面代码展示了如何在静态方法上使用synchronized关键字。当线程尝试进入类的静态方法时,会尝试获取类上的锁(注意是类上的锁,跟类实例上的锁不同的东西)。如果没有其他线程持有此类上的锁,那么当前线程会获取此类上的锁,然后运行此方法。
下面展示了反编译的结果(运行命令:javap -v MyClassStatic.class)

可以从反编译的结果中看到在方法的flags中也是包含了ACC_SYNCHRONIZED.

synchronized用在代码块上

如果synchronized关键字用在代码块上,会在其之后括号中的对象获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
public class MyClass {
public void log(String msg) {
synchronized(this) {
System.out.println(msg);
}
}
public static void func(String msg) {
synchronized(MyClass.class) {
System.out.print(msg);
}
}
}

在上面的代码中,在方法log中,当线程进入log方法,然后执行synchronized关键字修饰的代码块。线程会尝试获取当前类实例上的锁,因为括号中使用的是this关键字。而在func方法中,线程会尝试获取MyClass类的锁。注意这两个锁是不同的。所以一个线程可以执行log中的同步代码块,而同时另一个线程也可以执行func中的同步代码块。
反编译结果如下(运行命令:javap -v MyClass.class):

可以看到在方法的签名中是没有ACC_SYNCHRONIZED的。但是在代码中出现了monitorenter和monitorexit。
monitorenter:

Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

引用中的内容来自JVM规范。这段话的大概意思为:
每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

  1. 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
  2. 如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
  3. 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。

monitorexit:

The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

这段话的大概意思为:
执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
通过这两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。
一个monitorenter都会对应有一个monitorexit。但是我们从反编译的结果中,可以看到多出了一个monitorexit,即第18行。因为在synchronized中的代码遇到异常时,会释放锁。第一个 monitorexit 指令如果正确执行,会走到下面的 goto 指令,直接跳转到 21 行 return,而如果发生异常,下面的 astore_3 和 aload_2 指令会继续执行异常问题,下一步会继续执行 monitorexit 指令退出同步。

当然,有时候,我们也可以这样用synchronized关键字修饰代码块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyClass2 {
private Object obj = new Object();
private static Object obj2 = new Object();
public void log(String msg) {
synchronized(obj) {
System.out.print(msg);
}
}
public static void func(String msg) {
synchronized(obj2) {
System.out.print(msg);
}
}
}

在上面的代码中,log方法中的同步块是对obj实例进行加锁,注意每个MyClass2实例中的obj都是不同的。而func中的同步块是对obj2静态成员进行加锁。

Java Memory Model

发表于 2019-05-06 | 分类于 java

Java内存模型规定了Java虚拟机如何跟计算机的内存协同工作。因为Java虚拟机模拟了计算机,所以自然Java虚拟机包括内存模型。

正确理解Java内存模型对于编写正确的并发程序非常重要。Java内存模型规定了线程何时以及怎么读取其他线程写的值,还有就是在获取共享变量时如何进行同步操作。

最初的Java内存模型是有缺陷的,因此在Java5中进行了修改,并且这个版本的Java内存模型一直到Java8都在使用。

Java内存模型

JVM中的内存模型将内存分为线程栈内存和堆内存。下面的图从逻辑上展示了Java内存模型:

每个在JVM中运行的线程都有它自己栈空间。线程的栈空间中包含了线程调用方法执行到那一刻的数据。随着线程执行它的代码,调用栈也随之改变。

线程的栈空间同样也包含每个执行的方法的局部变量。线程只能获取它自己的栈空间,其中包含的局部变量对于其他线程是不可见的。即使两个线程执行相同的代码,这两个线程也是在各自的线程栈空间中创建各自的局部变量。

所有的原型类型(boolean,byte,short,char,int,long,float,double)的局部变量都是存储在线程的栈空间中。一个线程可能传递一个原型变量的副本给另一个线程,但是另一个线程并不能共享这个原型变量。

不管哪个线程创建了对象,这些对象都是存储在堆空间中。这也包括原型类型的包装器类型(e.g. Byte, Integer)。不管对象是被分配给局部变量还是作为另一个对象的成员变量,这个对象都是存储在堆空间中。

下面的图中说明了调用栈和局部变量存储在线程的栈空间中,而对象存储在堆空间中:

如果局部变量是原型类型,那么这个变量在线程的栈空间中。

如果局部变量是一个指向对象的引用类型,那么这个引用是在线程的栈空间,但是对象本身是在堆空间中。

如果一个对象包含方法,同时这些方法包含局部变量。那么这些局部变量(原型类型或者引用)是保存在线程栈空间中,即使这些方法所在的对象是存储在堆空间中。

一个对象的成员变量是跟对象一起存储在堆内存中,不管这个成员变量是原型还是指向一个对象的引用。

类的静态变量也是跟类的定义一起存在堆内存中。

堆空间中对象能够被所有拥有指向此对象的引用的线程访问到。当一个线程能够访问一个对象时,那么这个线程也能够访问此对象的成员变量(这里要看这个对象的封装性)。如果两个线程同时调用了同一个对象的一个方法,那么这两个线程将能够访问这个对象的成员变量,但是每个线程都会获得局部变量的一份拷贝。

下面的图说明了上面描述的内容:

在上图中,两个线程有一系列的局部变量。其中一个局部变量(Local Variable 2)指向了堆内存上的共享对象(Object3)。这两个线程分别拥有一个指向同一个对像的引用,这两个引用是不同的。这些引用是局部变量,所以存储在各自线程的栈空间中。而这两个不同的引用指向了堆上的同一个对象。

注意到共享对象(Object3)有两个引用指向了Object2和Object4,如图中箭头所示。通过Object3中的成员变量引用,这两个线程能够获取Object2和Object4。

The diagram also shows a local variable which point to two different objects on the heap. In this case the references point to two different objects (Object 1 and Object 5), not the same object. In theory both threads could access both Object 1 and Object 5, if both threads had references to both objects. But in the diagram above each thread only has a reference to one of the two objects.

上图中同样了展示了一个局部变量指向堆上两个不同的对象。指向不同对象(Object1, Object5)的引用不是同一个对象。理论上,如果这两个线程有指向这两个对象的引用,那么这两个线程都能够访问Object1和Object5。但是在上图中每个线程只有指向其中一个对象的引用。

那么,在Java代码中如何反应上面的内存图呢?代码很简单,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MyRunnable implements Runnable() {
public void run() {
methodOne();
}
public void methodOne() {
int localVariable1 = 45;

MySharedObject localVariable2 =
MySharedObject.sharedInstance;

//... do more with local variables.

methodTwo();
}
public void methodTwo() {
Integer localVariable1 = new Integer(99);

//... do more with local variable.
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MySharedObject {

//static variable pointing to instance of MySharedObject

public static final MySharedObject sharedInstance =
new MySharedObject();


//member variables pointing to two objects on the heap

public Integer object2 = new Integer(22);
public Integer object4 = new Integer(44);

public long member1 = 12345;
public long member1 = 67890;
}

如果两个线程执行run()方法,那么此代码就表明了上图所示的内存分布。run方法首先调用methodOne方法,然后methodOne方法调用methodTwo方法。

在methodOne方法中定义了一个原型的局部变量localVariable1,同时定义了一个指向对象的引用localVariable2.

每个执行methodOne方法的线程都会在各自的线程栈空间中创建localVariable1和localVariable2的副本。localVariable1在两个线程中是完全独立的,仅仅存在于对应线程的栈空间中。一个线程不能看到其他线程对于localVariable1变量的修改。

执行methodOne方法的线程同样会创建localVariable2的副本。但是,这两个localVariable2的副本是指向在堆上的同一个对象。而localVariable2引用指向的对象是一个类的静态成员变量。而类的静态变量在堆中只存在一份。所以两个localVariable2指向同一个MySharedObject对象的实例,MySharedObject实例存储在堆内存上,对应图中的Object3对象。

我们在来看看methodTwo方法是如何创建localVariable1局部变量的。这个局部变量是指向一个Integer对象的引用。methodTwo方法每次都重新创建一个Integer实例。局部变量localVariable2引用是存储在对应的栈空间中,而对应的Integer对象是在堆内存中,因为每次执行methodTwo方法都会创建Integer对象,所以在堆内存中会有两个Integer对象。即对应于图中的Object1和Object5对象。

在MySharedObject中有两个long类型的局部变量,因为这些变量是类的成员变量,所以它们跟具体的对象一起保存在堆内存中。仅仅局部变量是保存在栈内存中的。

硬件内存架构

硬件内存跟Java内存模型是有些不一样的。为了更好地理解Java内存模型,我们需要了解硬件内存架构。下面的图简单描述了现代计算机的硬件架构:

现代的计算机通常会有两个以上的CPU,同时这些CPU可能有多个核。这也意味着我们可以将多个线程同时运行在多个CPU上。在给定的时间点,每个CPU都能运行一个线程。即如果你的Java程序是多线程的,那么能够在多个CPU上同时运行(并行)。

在上图中我们可以看到每个CPU内部都有一系列的寄存器。CPU在寄存器上执行的操作要比在主存中的操作快。这是因为CPU能够更快的获取寄存器上的内容。

每个CPU都有一个对应的缓存内存。获取缓存中的内容要快于获取主存中的内容,但是没有获取寄存器中的内容快。CPU缓存的速度要介于寄存器和主存之间。有些CPU可能会有多级的缓存。

一个计算机包含一个主存(RAM),所有的CPU都能够获取主存中的内容。主存的容量要远远大于缓存的容量。

如果一个CPU要读取主存的内容,通常只会读取主存中部分区域的内容到CPU缓存中,然后在从缓存读取到寄存器中,之后进行计算。当CPU需要写结果到主存中,它会将寄存机中的值刷新到缓存中,然后在之后的某个时间点,在将缓存中的内容刷新到主存中。

当CPU需要存储缓存中的值时,会将缓存中的值刷新到主存中。同时CPU缓存也可以局部的刷新缓存值以及写出缓存值。

Java内存模型和硬件内存架构之间的桥接

像前面说明的,Java内存模型和硬件内存架构是不同的。硬件内存架构不会区分线程栈和堆空间。在硬件中,线程栈和堆空间都是在主存中的。同时,部分线程栈和堆内存会出现在CPU缓存或者CPU寄存器中。下面的图进行了说明:

当对象和变量存储在不同的内存区域时,会出现一些问题。主要的两个问题如下:

  1. 共享变量的可见性(可见性,即一个线程能够及时的看到另一个线程对于共享变量的修改)
  2. 竞态条件(即读取,检查,写入共享变量)

下面会依次介绍这两个问题。

共享变量的可见性

如果两个线程共享一个对象,但是没有采用合适的volatile关键字或者同步操作,那么当一个线程更新共享对象时,可能另一个线程并不能看到更新后的值。

想象一下,一个共享对象最初是在主存中,一个CPU上的线程读取了此对象到CPU缓存中,之后对于这个共享对象进行了修改。只要CPU缓存没有刷新到主存中,那么运行在其他CPU上的线程是不能读取到修改后的共享变量的值的。这会造成其他CPU只能看到修改之间的值。

下面的图说明了这种情况。在左边的CPU上运行的线程将共享对象读取到CPU缓存中,然后将它的count变量修改为2.由于没有将count的修改刷新到主存中,所以在右侧CPU上运行的线程不能看到这个修改。

为了解决这个问题,我们可以使用Java中的volatile关键字.volatile关键字的作用是保证每次都是从主存中读取变量的值,同时如果修改了此变量,那么此变量会立刻写回到主存中。

竞态条件

如果两个线程或多个线程共享一个对象,那么当多余一个线程修改此变量时,竞态条件就可能会出现。

想象一下,线程A将一个共享对象的count变量读取进CPU缓存,线程B也将count读取到另一个CPU缓存。现在线程A在count加上1,同时线程B也在count上加上1.现在变量被增加了两次,在每个CPU缓存中各一次。

如果加1操作是顺序进行的,那么count的值会加上2,然后写回到主存中。

但是,这两个操作是在没有正确同步的情况下同时进行的。尽管线程A或者B会将count修改后的值写回到主存,但是更新后的值始终比原来的值大1.

下面的图说明了上面描述的竞态条件:

为了解决这个问题,可以使用Java的同步块,即synchronized关键字。同步块保证了在同步块中获取到的变量都是从主存中读取的,并且当线程离开同步块时,所有修改的变量会被刷新到主存中,而不管这个变量是否被volatile声明。

12…4>

37 日志
22 分类
45 标签
RSS
© 2024 liang
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4