kafka存储

Kafka很大程度上依赖文件系统来存储和缓存消息。有一普遍的认识:磁盘很慢。这让人们怀疑使用磁盘作为持久化的性能。实际上,磁盘是快还是慢完全取决于我们是如何使用它。

就目前来说,一个7200rpm SATA RAID-5磁盘线性(顺序)写入的性能能达到600MB/sec,而任意位置写(寻址再写)的性能只有100k/sec。这些线性读写是所有使用模式中最可预测的,并且由操作系统进行了大量优化。现在的操作系统提供了预读取和后写入的技术。实际上你会发现,顺序的磁盘读写比任意的内存读写更快。

发现线性的访问磁盘(即:按顺序的访问磁盘),很多时候比随机的内存访问快得多,而且有利于持久化

基于jvm内存有以下缺点

  • 对象的内存开销非常高,通常会让存储数据的大小加倍(或更多)
  • 随着堆内数据的增加,GC的速度越来越慢,而且可能导致错误

基于操作系统的文件系统来设计有以下好处:

  • 可以通过os的pagecache来有效利用主内存空间,由于数据紧凑,可以cache大量数据,并且没有gc的压力
  • 即使服务重启,缓存中的数据也是热的(不需要预热)。而基于进程的缓存,需要程序进行预热,而且会消耗很长的时间。(10G大概需要10分钟)
  • 大大简化了代码。因为在缓存和文件系统之间保持一致性的所有逻辑都在OS中。以上建议和设计使得代码实现起来十分简单,不需要尽力想办法去维护内存中的数据,数据会立即写入磁盘。

总的来说,kafka这种是以页面缓存(pagecache)为中心的设计风格, Kafka不会保持尽可能多的内容在内存空间,而是尽可能把内容直接写入到磁盘。所有的数据都及时的以持久化日志的方式写入到文件系统

下面再详细说Kafka 如何做到高吞吐、低延迟的呢?
这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁盘。
因此 Kafka 达到高吞吐、低延迟的原因主要有以下4点:

  • 页缓存是在内存中分配的,所以消息写入的速度很快。
  • Kafka 不必和底层的文件系统进行交互,所有繁琐的 I/O 操作都由操作系统来处理。
  • Kafka 采用追加写的方式,避免了磁盘随机写操作。
  • 使用以 sendfile 为代表的零拷贝技术提高了读取数据的效率。

使用页缓存而非堆内存还有一个好处,就是当 Kafka broker 的进程崩溃时,堆内存的数据会丢失,但是页缓存的数据依然存在,重启 Kafka broker 后可以继续提供服务。

Page Cache 带来的好处?
Linux 总会把系统中还没被应用使用的内存挪来给 Page Cache,在命令行输入free,或者 cat /proc/meminfo ,“Cached”的部分就是 Page Cache。

Page Cache 中每个文件是一棵 Radix 树(又称 PAT 位树, 一种多叉搜索树),节点由 4k 大小的 Page 组成,可以通过文件的偏移量(如 0x1110001)快速定位到某个Page。

当写操作发生时,它只是将数据写入 Page Cache 中,并将该页置上 dirty 标志。

当读操作发生时,它会首先在 Page Cache 中查找,如果有就直接返回,没有的话就会从磁盘读取文件写入 Page Cache 再读取。

可见,只要生产者与消费者的速度相差不大,消费者会直接读取之前生产者写入Page Cache的数据,大家在内存里完成接力,根本没有磁盘访问。

而比起在内存中维护一份消息数据的传统做法,这既不会重复浪费一倍的内存,Page Cache 又不需要 GC (可以放心使用60G内存了),而且即使 Kafka 重启了,Page Cache 还依然在。

关于page cache,最好参考understand linux kernel这本书,下面节选了
The page cache is the main disk cache used by the Linux kernel. In most cases, the kernel refers to the page cache when reading from or writing to disk. New pages are added to the page cache to satisfy User Mode processes’s read requests. If the page is not already in the cache, a new entry is added to the cache and filled with the data read from the disk. If there is enough free memory, the page is kept in the cache for an indefinite period of time and can then be reused by other processes without accessing the disk.

所以如果要深入的学习分布式系统,linux内核必须要深入学习

参考

磁盘阵列 https://zh.wikipedia.org/wiki/RAID
ssd vs raid https://mybroadband.co.za/news/hardware/255699-how-raid-can-give-your-hard-drives-ssd-like-performance.html
pcie https://searchstorage.techtarget.com/definition/PCIe-SSD-PCIe-solid-state-drive
常见问题 https://zhuanlan.zhihu.com/p/65512721
索引文件 https://matt33.com/2016/03/08/kafka-store/

page cache
https://www.cnblogs.com/sxhlinux/p/6376480.html
https://qinglinmao8315.github.io/linux/2018/03/14/linux-page-cache.html

kakfa读写 https://medium.com/@durgaswaroop/a-practical-introduction-to-kafka-storage-internals-d5b544f6925f

kafka高可用

如果保证消息不丢失且不重复消费

https://www.zhihu.com/question/34842764
https://blog.csdn.net/matrix_google/article/details/79888144
https://blog.csdn.net/qingqing7/article/details/80054281
关于消息不丢失其实再kafka权威指南这本书已经讲过,但是不重复消费只是讲解了怎么减少重复消息的数量,

首先说明,Kafka 的设计就是 at-least-once 的, 具体可以看
https://www.infoq.cn/article/kafka-analysis-part-8
http://www.jasongj.com/2015/03/10/KafkaColumn1/#Kafka-delivery-guarantee

这里会涉及到幂等性,这个是数学的概念,用在编程领域,则意为对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的,还是要多学习数学,
https://www.jianshu.com/p/475589f5cd7b

baeldung也有文章介绍这个问题,
In this tutorial, we’ll look at how Kafka ensures exactly-once delivery between producer and consumer applications through the newly introduced Transactional API.
https://www.baeldung.com/kafka-exactly-once

kafka不同于一般的message queue

Multiple Consumers

In addition to multiple producers, Kafka is designed for multiple consumers to read any single stream of messages without interfering with each other. This is in contrast to many queuing systems where once a message is consumed by one client, it is not available to any other. Multiple Kafka consumers can choose to operate as part of a group and share a stream, assuring that the entire group processes a given message only once.

https://stackoverflow.com/questions/35561110/can-multiple-kafka-consumers-read-same-message-from-the-partition
kafka可以允许多个consumer group同时消费同一个消息

kafka definitive guide

kafka配置 log.dirs:

kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
Note that the broker will place a new partition in the path that has the least number of partitions currently stored in it, not the least amount of disk space
选择最少分区的目录,而不是最少磁盘空间的目录

Memory

The normal mode of operation for a Kafka consumer is reading from the end of the partitions, where the consumer is caught up and lagging behind the producers very little, if at all. In this situation, the messages the consumer is reading are optimally stored in the system’s page cache, resulting in faster reads than if the broker has to reread the messages from disk. Therefore, having more memory available to the system for page cache will improve the performance of consumer clients. Kafka itself does not need much heap memory configured for the Java Virtual Machine (JVM). Even a broker that is handling X messages per second and a data rate of X megabits per second can run with a 5 GB heap. The rest of the system memory will be used by the page cache and will benefit Kafka by allowing the system to cache log segments in use. This is the main reason it is not recommended to have Kafka collocated on a system with any other significant application, as they will have to share the use of the page cache. This will decrease the consumer performance for Kafka.

https://en.wikipedia.org/wiki/Page_cache

page 33

Linux swap space, vm.swappiness 用法, Virtual_memory
虚拟内存是计算机系统内存管理的一种技术。
Virtual memory is a combination of RAM and disk space that running processes can use.
它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间),而实际上,它通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。与没有使用虚拟内存技术的系统相比,使用这种技术的系统使得大型程序的编写变得更容易,对真正的物理内存(例如RAM)的使用也更有效率。

It is an important kernel parameter for Kafka because the more memory allocated to the swap space, the less memory can be allocated to the page cache. Cloudera recommends to set vm.swappiness value to 1.

所以kafka可以禁止使用swap到磁盘

page 60

If a key exists and the default partitioner is used, Kafka will hash the key (using its own hash algorithm, so hash values will not change when Java is upgraded), and use the result to map the message to a specific partition. Since it is important that a key is always mapped to the same partition, we use all the partitions in the topic to calculate the mapping — not just the available partitions. This means that if a specific partition is unavailable when you write data to it, you might get an error. This is fairly rare, as you will see in Chapter 6 when we discuss Kafka’s replication and availability. The mapping of keys to partitions is consistent only as long as the number of partitions in a topic does not change. So as long as the number of partitions is constant,you can be sure that, for example, records regarding user 045189 will always get written to partition 34. This allows all kinds of optimization when reading data from partitions.However, the moment you add new partitions to the topic, this is no longer guaranteed — the old records will stay in partition 34 while new records will get written to a different partition. When partitioning keys is important, the easiest solution is to create topics with sufficient partitions (Chapter 2 includes suggestions for howto determine a good number of partitions) and never add partitions.

如果机器数量是10个,topicA的分区数量是5,某个key来计算hash的时候应该是根据分区来算的,和机器没关系

上面可以看到计算key分布在哪个分区的时候用的是所有的node,而不是available的node,因为计算hash的时候如果你的node节点挂了就不计算在内的话就会出现新的key到不一样的分区去了,但是前提是topic的分区数量没有改变,某个topic的分区是编号的,每个分区都有复制,所以即便某个机器挂了,依然可以用复制的分区来读写,复制的分区的编号和之前的leader应该是一样的

org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition

Partitions in a Kafka topic are numbered from 0…N.

Kafka Default Partiotioner is using modulo for the same purpose

下面还有解释
Keys are used when messages are to be written to partitions in a more controlled manner. The simplest such scheme is to generate a consistent hash of the key, and then select the partition number for that message by taking the result of the hash modulo, the total number of partitions in the topic. This assures that messages with the same key are always written to the same partition. Keys are discussed in more detail in Chapter 3.

kafka并没有需要用一致性hash,默认是mod,分区数量需要保持不变,这样才可以保证每个key永远都落到同一个分区上面

page 76

kakfa-re-process-message

If the committed offset is smaller than the offset of the last message the client processed, the messages between the last processed offset and the committed offset will be processed twice.

上次的offset是2,但是上一次pull过来的数据是7-11,说明3-6已经处理过了,正在处理10,11还没有处理,所以如果从2加上1开始处理的话,3-10就会重复处理

kakfa-lost-message

If the committed offset is larger than the offset of the last message the client actually processed, all messages between the last processed offset and the committed offset will be missed by the consumer group

上次的offset是11,但是上次pull过来的4-11只是处理到了5,也就是说6-11就没有机会处理 了


至于为什么offset和之前处理过的消息的posistion不一致,看看有没有其他办法解决

前者是commit的速度赶不上pull的速度,后者是pull的速度赶不上commit的速度

因为提交commit是5秒一次,所以就会存在pull过来的offset大于或者小于上次commit的offset,哪怕减少到1s也不可能完全杜绝

如果设置自动提交offset为false的话可以杜绝消息丢失,但是不能杜绝重复消费,但是可以减少重复消费的数量,比如第一个图,如果是pull消息消费完成在手动提交的话,消息6肯定就是last commited offset,如果消息处理到10的时候挂了,那重复消费就是7-10而不是2-10

page 97

Another task the leader is responsible for is knowing which of the follower replicas is up-to-date with the leader. Followers attempt to stay up-to-date by replicating all the messages from the leader as the messages arrive, but they can fail to stay in sync for various reasons, such as when network congestion slows down replication or when a broker crashes and all replicas on that broker start falling behind until we start the broker and they can start replicating again.

但是重启之后分区的leader就会发生变化了,因为宕机后会重新选举分区的leader,机器重新启动后replicate还是会继续之前的进行,如果这台机器不能启动了,那重新弄台新的机器with the same broker id,这样新的机器也可以把所有消息复制过来,新的机器不会成为controller,新的机器里面也只是众多分区的follower,不会成为leader

在新建topic的时,kafka集群按照上述方法去创建分配,leader是均匀分布的,但是时间久了,当某台机器宕机后,follower会变成leader,之前的leader在机器restart之后,赶上了进度,加入了ISR列表,此时变成了follower,不在提供读写能力。这个时候就会存在leader分配不均的问题
只有partition均匀分布才可以负载均衡
https://www.jianshu.com/p/c987b5e055b0

page 71

You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the consumer logic in its own object and then use Java’s ExecutorService to start multiple threads each with its own consumer. The Confluent blog has a tutorial that shows how to do just that.

page 107

By default, each segment contains either 1 GB of data or a week of data, whichever is smaller.
新的segment的条件是先到1 GB或者7天到了,注意是OR,类似于保养3年或者10万公里

As a Kafka broker is writing to a partition, if the segment limit is reached, we close the file and start a new one.

The segment we are currently writing to is called an active segment. The active segment is never deleted, so if you set log retention to only store a day of data but each segment contains five days of data, you will really keep data for five days because we can’t delete the data before the segment is closed. If you choose to store data for a week and roll a new segment every day, you will see that every day we will roll a new segment while deleting the oldest segment - so most of the time the partition will have seven segments.
意思就是虽然设置log retention为一天,但是因为数据到了5天才达到1 GB,也就是到5天才生成新的segment,这样你也要保留数据5天,因为active的segment不能删除

但是kafka有个问题,如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机I/O这个时候对性能影响很大。所以一般来说Kafka不能有太多的partition。针对这一点,RocketMQ把所有的日志都写在一个文件里面,就能变成顺序写,通过一定优化,读也能接近于顺序读。
上面这段话是阿里的博客,好像是错误的, 单个partition是可以实现消息的顺序写入的。哪怕再多的分区,对于单个分区也是一样顺序写,比如一个producer,100万个message,5台机器,100个分区,一个机器20个分区,那么平均一个分区接受1万个消息,怎么会变成随机写呢?一个分区内部也只有一个segment是active啊,没有多个segment的说法

我们建议的做法是,如果是3个broker的集群,有5个消费者,那么建议partition的数量是15,也就是broker和consumer数量的最小公倍数。当然,也可以是一个大于消费者的broker数量的倍数,比如6或者9,还请读者自行根据实际环境裁定。
https://bbs.huaweicloud.com/blogs/bf79ccc8d0fd11e7b8317ca23e93a891

pyspark introduction

spark内置提供了机器学习的很多api,有scala,java,python版本的,但是如果需要用到深度学习,需要安装第三方的package,
而且需要python来写,也就是pyspark
这里主要使用的是Deep Learning Pipelines,由spark官方提供
https://github.com/databricks/spark-deep-learning#working-with-images-in-spark
https://docs.databricks.com/applications/deep-learning/single-node-training/deep-learning-pipelines.html

安装深度学习的第三方package

$SPARK_HOME/bin/spark-shell –packages databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11

install python3.6, pip

yum install -y https://centos7.iuscommunity.org/ius-release.rpm
yum install -y python36u python36u-libs python36u-devel python36u-pip

测试
python3.6 -V

升级下pip
pip install –upgrade pip

python环境变量

conf/spark-env.sh下面添加export PYSPARK_PYTHON=/usr/bin/python3.6

第一个程序

先安装这个第三方package
pip install numpy

进入shell
./bin/pyspark

1
2
from pyspark.ml.image import ImageSchema
image_df = ImageSchema.readImages("/data/myimages")

from sparkdl.image import imageIO as imageIO
这个module也需要pip来安装

总结

为什么要修改spakr默认的python环境变量,我发现默认的是python2.6,这样即便是安装了第三方的package还是会not found

https://github.com/databricks/spark-deep-learning/tree/master/python 里面是python的api
https://github.com/databricks/spark-deep-learning/tree/master/src/main/scala 里面是具体的实现

spark source code debug

直接github下载spark的源码
\spark-2.4.2\core\src\test\scala\org\apache\spark\SortShuffleSuite.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
test("mytest") {
 sc = new SparkContext("local", "test", conf)
 val dfsFilename = "c:/temp/abc.txt"
 val readFileRDD = sc.textFile(dfsFilename)
 val wcounts1 = readFileRDD.flatMap(line => line.split(" "))
 wcounts1.collect().foreach(println)
 }

test("mytest2") {
 sc = new SparkContext("local", "test", conf)
 val dfsFilename = "c:/temp/abc.txt"
 val readFileRDD = sc.textFile(dfsFilename)
 val wcounts1 = readFileRDD.flatMap(line => line.split(" ")).filter(w => (w == "Humpty") || (w == "Dumpty")).map(word => (word, 1)).reduceByKey(_ + _)
wcounts1.collect.foreach(println)
 }

最方便的就是源码中找到一个sparkcontext创建好的,然后加上自己的代码就好了

spark rdd

RDD这种抽象是整个Spark的核心,具有如下特定:
immutable: 不可变(只读)
resilient: 弹性(容错)
distributed:分布式(数据存储在整个集群的各个机器上)
In-Memory:数据位于内存中(Spark提供了其他的持久化选项)
Lazy evaluated:有Action触发时才会执行真正的操作
Cacheable:内存不足以容纳数据时,可以缓存到磁盘
Typed:每个RDD都有类型,例如RDD[String],RDD[(int,String)]
patitioned:RDD的数据划分为多个Partition,分布在集群中,每个JVM(Executor)一个Partition。

https://www.zhihu.com/question/23079001
这个文章讲了为什么多个rdd的DAG没有耗光内存,
比如rdd是不可变的,多个rdd的内存加起来不是很大了吗

iterator rdd
https://www.jianshu.com/p/c05d33b4d70d

rdd的不可变
https://www.zybuluo.com/BrandonLin/note/448121
一个rdd计算出另外一个rdd,之前的rdd就被丢弃了,不可能dag里面的每个rdd都占据着内存
而且各个rdd的内存也有共同的部分,只是在api层面不可变
如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,一个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。

rdd源码分析
https://www.jianshu.com/p/f5b50e233acc
https://mr-dai.github.io/spark_core_source_1/
https://stackoverflow.com/questions/30446706/implementing-custom-spark-rdd-in-java

1
2
3
4
5
6
val textFile = sc.textFile("hdfs://…")
val counts = textFile.flatMap(line => line.split(" "))
 .filter(_.length >= 2)
 .map(word => (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://…")

上面这个例子textFile会生成一个haddoprdd,然后就是MapPartitionsRDD,
textFile是一个HadoopRDD经过map后的MapPartitionsRDD,
经过flatMap后仍然是一个MapPartitionsRDD,
经过filter方法之后生成了一个新的MapPartitionsRDD,
经过map函数之后,继续是一个MapPartitionsRDD,
最后经过reduceByKey变成了ShuffleRDD。
rdd中主要就是compute方法还有iterator概念
def compute(split: Partition, context: TaskContext): Iterator[T]

RDD 抽象类要求其所有子类都必须实现 compute 方法,该方法接受的参数之一是一个Partition 对象,目的是计算该分区中的数据。
最后actioins执行的时候会一层一层向parent来执行迭代器里面的操作
再两个例子

1
2
3
4
val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")

Spark在运行时动态构造了一个复合Iterator。就上述示例来说,构造出来的Iterator的逻辑概念上大致长这样:
所以并不是每个transformation都会生成一份新数据,而是串行化的执行(起码在一个stage里面)
下面是伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
new Iterator[String] {
 private var head: String = _
 private var headDefined: Boolean = false
def hasNext: Boolean = headDefined || {
 do {
 try head = readOneLineFromHDFS(…) // (1) read from HDFS
 catch {
 case _: EOFException => return false
 }
 } while (!head.startsWith("ERROR")) // (2) filter closure
 true
 }
def next: String = if (hasNext) {
 headDefined = false
 head.split(" ")(1) // (3) map closure
 } else {
 throw new NoSuchElementException("…")
 }
}

可不可以认为,想map mapPartition这样的操作实际上在运行的时候后都是逐行处理数据的?
yes
Spark的计算执行可以认为是一个这样的过程:从一个RDD读取数据,做处理,然后依照action的不同把结果发回用户空间。这个过程中可能会有很多中间临时的RDD,假如你对这些RDD设置了cache,那么在它所在的计算环节结束后的中间结果就会被缓存起来,缓存有个缓存管理器,spark里被称作blockmanager。注意哦,这里还有一个误区是,很多初学的同学认为调用了cache或者persist的那一刻就是在缓存了,这是完全不对的,真正的缓存执行指挥在action被触发,job被提交开始计算,在计算的中间过程中才会执行。
我的理解是当action执行结束过才会把中间结果缓存起来,因为spark知道了你要缓存的是哪个rdd,所以他把那个rdd的结果都做上标记,结束后一起缓存起来
数据是作为流一条条从管道的开始一路走到结束。最为直观的好处就是:不需要加载全量数据集,上一次的计算结果可以马上丢弃。全量数据集其实是一个很恐怖的东西,全世界都在避免它,所以某种意义上来看,如果没有Shuffle过程,Spark所需要内存其实非常小,一条数据又能占多大空间。第二,如果不是Pipeline的方式,而是马上触发全量操作,势必需要一个中间容器来保存结果,其实这里就又回到MapReduce的老路,效率很低。

MapPartitionsRDD的compute方法的作用了:在有前置依赖的条件下,在父RDD的Iterable接口上给遍历每个元素的时候再套上一个方法

spark 权威指南 笔记

page 29

In specifying this action, we started a Spark job that runs our filter transformation (a narrow
transformation), then an aggregation (a wide transformation) that performs the counts on a per
partition basis, and then a collect, which brings our result to a native object in the respective
language. You can see all of this by inspecting the Spark UI, a tool included in Spark with which you
can monitor the Spark jobs running on a cluster.

1
2
3
val myRange = spark.range(1000).toDF("number")
val divisBy2 = myRange.where("number % 2 = 0")
divisBy2.count()

一开始很疑惑为什么count会是wide transformation还包括collect,可以需要看执行计划还有spark ui去加深理解,
写代码rdd的count和dataset的count去比较
尤其是dataset部分

https://dzone.com/articles/apache-spark-3-reasons-why-you-should-not-use-rdds
这篇文章也说了count在rdd和dataframe不一样

spark dataframe的count方法和rdd的count action不是一个意思,
https://stackoverflow.com/questions/47194160/why-is-dataset-count-causing-a-shuffle-spark-2-2

更新:通过sql的执行计划图
http://k8s-master:18080/history/app-20190628083643-0001/SQL/execution/?id=0
stage
可以看到第一个stage两个task,每个task计算出count为250,这样HashAggregate出来的rows就是2,通过shuffle进入到
第二个stage,第二个stage汇总count后得出总的count是500,row是1
总结:这个case从第一个stage到第二个stage分区数目从2到1,但是看执行计划确实也算是shuffle操作,
只要有shuffle操作就会有新的stage
这个case的count确实是action,虽然filter操作不会有shuffle操作,但是由于count底层还是有了shuffle操作,
从图片看filter确实也没有wide transformation(shuffle),只不过filter后才有了聚合操作

page 299

1
2
3
4
5
6
7
8
spark.read
.option("header", "true")
.csv("/root/online-retail-dataset.csv")
.repartition(4)
.selectExpr("instr(Description, 'GLASS') >= 1 as is_glass")
.groupBy("is_glass")
.count()
.collect()

这个可以在shell里面执行

1
2
3
4
5
6
7
8
scala> spark.read.option("header", "true").csv("/root/online-retail-dataset.csv")    .repartition(4).selectExpr("instr(Description,'GLASS') >= 1 as is_glass")    .groupBy("is_glass").count().show()
+--------+------+
|is_glass| count|
+--------+------+
| null| 1454|
| true| 12861|
| false|527594|
+--------+------+

运行在集群
./bin/spark-submit –class org.apache.spark.examples.SimpleApp2 –master spark://192.168.77.130:7077 –deploy-mode client –executor-memory 1500M –conf spark.eventLog.enabled=true –conf spark.sql.shuffle.partitions=6 /root/spark/spark-example-scala-1.0.jar

这个分为三个stage
第一个stage读取数据默认按照cpu的核心来分区,比如我两个work nodes,每个都是一个core,就是两个分区
第二个是强制分区为4个,反正还是这两个机器,所以shuffle 的代价不大,都是机器内部的转移数据
第三个是执行groupby之后默认的200个分区,上面参数指定为6,groupby的时候相同key的数据需要在一个task里面,所以需要shuffle,所以可以看到虽然有200个task,但是只有3个task是有数据的,最后collect会把所有的数据汇总到driver

看执行计划
http://k8s-master:18080/history/app-20190628102443-0003/SQL/execution/?id=1
stage

这个是第二个stage,为什么会输出12个row?
我们知道groupby导致下一个stage,但是上一个stage不可能把几十万行数据都shuffle出去没必要,
他会在当前的分区执行一次groupby,这就是聚合,这样每个分区都是3行数据,
4个分区就是12行数据,这个12行数据会在最后一个stage里面运行,
我发现最后一个stage就三个task有shuffle read,可能因为把同样的key发送到了同一个task上面了,一共就三个key(group by之后就3个key)

下面是书上说的
Notice that in Figure 18-5 the number of output rows is six. This convienently lines up with
the number of output rows(这个就是3) multiplied by the number of partitions(这个是4) at aggregation time. This is because Spark performs an
aggregation for each partition (in this case a hash-based aggregation) before shuffling the data aroundin preparation for the final stage.

page 264

Regardless of the number of partitions, that
entire stage is computed in parallel. The final result aggregates those partitions individually, brings
them all to a single partition before finally sending the final result to the driver. We’ll see this
configuration several times over the course of this part of the book.

Tasks
Stages in Spark consist of tasks. Each task corresponds to a combination of blocks of data and a set of transformations that will run on a single executor. If there is one big partition in our dataset, we
will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in
parallel. A task is just a unit of computation applied to a unit of data (the partition). Partitioning your
data into a greater number of partitions means that more can be executed in parallel. This is not a
panacea, but it is a simple place to begin with optimization.

page 133

1
2
3
4
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/FileStore/tables/retail-data/all/online_retail_dataset-92e8e.csv")

这个dateframe一共541909行数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 df.groupBy("InvoiceNo", "CustomerId").count().count()
``` 
这个返回25900,说明分组之后的行是这么多
df.groupBy("InvoiceNo", "CustomerId").count().show()
这个可以看分组后的数据,这个count返回的是个dataset,所以可以继续执行count这个action
这个job有两个stage,
第一个stage有8个task,分别读取8分之1的数据,然后shuffle write, write的行书就是25907,
第二个stage读取这些数据,上面8个分区的数据都是重复的group后的结果,所以继续汇总下,就是剩下25900行数据

## page 57
Overview of Structured Spark Types
Spark is effectively a programming language of its own. Internally, Spark uses an engine called
Catalyst that maintains its own type information through the planning and processing of work. In
doing so, this opens up a wide variety of execution optimizations that make significant differences.
Spark types map directly to the different language APIs that Spark maintains and there exists a lookup
table for each of these in Scala, Java, Python, SQL, and R. Even if we use Spark’s Structured APIs
from Python or R, the majority of our manipulations will operate strictly on Spark types, not Python
types. For example, the following code does not perform addition in Scala or Python; it actually
performs addition purely in Spark:
下面是spark的dataset类型的编译时候就报错的解释例子

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person(“A”,10),Person(“B”,20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
`

deep learning

训练 模型

你可以把机器想象成一个小孩子,你带小孩去公园。公园里有很多人在遛狗。简单起见,咱们先考虑二元分类问题。你告诉小孩这个动物是狗,那个也是狗。但突然一只猫跑过来,你告诉他,这个不是狗。久而久之,小孩就会产生认知模式。这个学习过程,就叫“训练”。所形成的认知模式,就是”模型“。训练之后。这时,再跑过来一个动物时,你问小孩,这个是狗吧?他会回答,是/否。这个就叫,预测。一个模型中,有很多参数。有些参数,可以通过训练获得,比如logistic模型中的权重。但有些参数,通过训练无法获得,被称为”超参数“,比如学习率等。这需要靠经验,过着grid search的方法去寻找。上面这个例子,是有人告诉小孩,样本的正确分类,这叫有督管学习。还有无督管学习,比如小孩自发性对动物的相似性进行辨识和分类。

自己的理解:
小孩的认知模式就是一个模型,你要给他足够的数据连训练出来一个新的认知模式,这个新的模型适合你自己的task
训练的是模型里的参数,模型是一个从输入到输出的黑盒子,训练是为了让这个黑盒子更适应您手头的任务(通过大量的数据)。
所以说深度学习要选择模型,这些模型一般人写不了,一般选择开源的模型,你把你自己的大量数据通过他来训练之后变成适合自己的模型
深度学习可以理解成用深度神经网络(DNN,Deep Neural Network)来进行机器学习,他俩的关系可以从这个定义中一目了然地看出来。

所以神经网络了解了解拿来用用就行了,不需要自己去实现,有开源的深度学习框架

框架 硬件

As such, your Keras model can be trained on a number of different hardware platforms beyond CPUs:

NVIDIA GPUs
Google TPUs, via the TensorFlow backend and Google Cloud
OpenCL-enabled GPUs, such as those from AMD, via the PlaidML Keras backend

GUDA

GUDA又是什么呢。CUDA就是通用计算,游戏让GPU算的是一堆像素的颜色,而GPU完全可以算其他任何运算,比如大数据量矩阵乘法等。同样,程序准备好对应的数组,以及让GPU如何算这些数组的描述结构(比如让GPU内部开多少个线程来算,怎么算,之类),这些数据和描述,都要调用CUDA库提供的函数来传递给CUDA,CUDA再调用显卡用户态驱动对CUDA程序进行编译,后者再调用内核态驱动将命令以及编译好的程序数据传送给GPU