hive 和spark的关系

https://www.quora.com/How-does-Apache-Spark-and-Apache-Hive-work-together/answer/Angela-Zhang-%E5%BC%B5%E5%AE%89%E7%90%AA
这个文章是讲的最好的

hive底层可以替换执行引擎为spark
spark也可以执行sql 基于hive的metastore

Spark SQL - Spark module that makes use of the Hive Metastore. You can use this via HiveContext. This currently (as of 2017) has better support, so we’ll talk more about this in the next section.

Hive on Spark - Hive project that integrates Spark as an additional engine. You can enable this by doing hive.execution.engine=spark. This support was added recently (2015 - 2016).

并发并行

tomcat哪怕在多核cpu中运行,也还是多线程并发,不算并行
tomcat内部使用多线程,但是操作系统会把这些现场分布到多个cpu core里面
所谓的并行,是指一个task分解多多个task,比如java 7的fork/join, 又或者spark的分布式计算,是一定需要多core的cpu或者多个cpu来计算

关于并发并行还需要学习

spark shuffle

the Spark SQL module contains the following default configuration: spark.sql.shuffle.partitions set to 200.

可以看到下面的执行计划有200个分区

1
2
3
4
5
scala> flightData2015.sort("count").explain()
== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
+- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/root/spark/data/testdata/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

shuffle-1

spark sql是高级的api,不是spark.default.parallelism?这个属性有关的rdd的底层api
这种高级api的spark.sql.shuffle.partitions是在shuffle的时候分区

这个好像跟之前的理解不一样,难道没有shuffle的transformation不需要分区?

http://blog.itpub.net/31511218/viewspace-2213494/
上面这个文章讲的不错
narrow transformation的分区跟文件大小有关系,wide transformation会需要设置spark.sql.shuffle.partitions,应该就是这样

What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism?
https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa
spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。
spark.sql.shuffle.partitions则是对sparks SQL专用的设置

更新

Shuffling is a process of redistributing data across partitions (aka repartitioning) that may or may not cause moving data across JVM processes or even over the wire (between executors on separate machines).
所以shuffle 不一定是跨jvm或者node的操作,也可能一个stage,8个machine,8个task都shuffle 了,但是下一个stage只有一个task,在一个node上面运行,这个task的shuffle从当前的机器加上另外7台机器的数据 ,新的task应该还是之前的jvm,不是新启动一个jvm,还是在当前的executor上面

spark word count in practice

cluster ui spark-master-ui.PNG

spark master ui

测试文件

为了减少内存的损耗,可以
xuhang meng
meng xuhang
xuhang xuhang xuhang
meng
xuhang
meng
meng liudehua
xuhang meng
meng xuhang
这个复制无数遍到200M,这样就是hdfs两个block,一个block=128MB

测试代码

以reducebyKey为分界点,前面是一个stage,后面是另外一个stage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.apache.spark.examples;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
lines.repartition(6);
JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
spark.stop();
}
}

在master节点启动driver

最后的job http://192.168.77.130:18080/history/app-20190624085329-0015/jobs/

parallelism=2, 并行设置为2, 就是200M的文件分区为2
每个executor分配2500M内存,内存消耗主要看重复的关键字多不多
两个stage,一个stage两个task,几乎就是一个task=1250M内存,也就是说并不是每个task都有2.5G内存
./bin/spark-submit –class org.apache.spark.examples.JavaWordCount –master spark://192.168.77.130:7077 –deploy-mode client –driver-memory=2g –executor-memory 2500M –conf spark.eventLog.enabled=true –conf spark.default.parallelism=2 /root/spark/spark-example-java-1.0.jar hdfs://k8s-master:9000/user/xuhang/test.txt
cluster-2-executor.PNG
这个官方图片恰好跟我这个例子一样,两个work node,一个node两个task
2-stage-4-task.PNG

在回顾下stage和task的概念

  • stage : stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执行顺序依次执行。
  • task : A unit of work within a stage, corresponding to one RDD partition。即 stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition,就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据

我们这个例子恰好两个分区,也就是2个task, stage也恰好两个
下面是第一个stage的2个task
stage0-task.PNG
下面是第二个stage的2个task
stage1-task.PNG
可以看到shuffle read和shuffle write都存在
shuffle write:发生在shuffle之前,把要shuffle的数据写到磁盘
为什么:为了保证数据的安全性,避免占用大量的内存
shuffle read:发生在shuffle之后,下游RDD读取上游RDD的数据的过程
具体关于shuffle需要单独学习

进一步

可以把parallelism=3或者更大的value,这样可以看到更多的task运行,如果设置为3,总共一个stage有3个task运行,
两个stage,总共就会有6个task,不过如果不设置的话会跟cpu核心的数量差不多,或者2到4倍

spark stage

intro

https://mapr.com/blog/how-spark-runs-your-applications/
Shuffling is a process of redistributing data across partitions (aka repartitioning) that may or may not cause moving data across JVM processes or even over the wire (between executors on separate machines).
Shuffling is the process of data transfer between stages.

When you invoke an action on an RDD, a “job” is created. Jobs are work submitted to Spark.
Jobs are divided into “stages” based on the shuffle boundary. This can help you understand.
Each stage is further divided into tasks based on the number of partitions in the RDD. So tasks are the smallest units of work for Spark.
Wide transformations basically result in stage boundaries.
spark的stage是各个shuffle操作的边界,也就是如果没有shuffle发生的话,还是在一个stage里面,4040端口看stage的相关信息加深理解
https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454
the stages are created based on the transformations. The narrow transformations will be grouped (pipe-lined) together into a single stage.

action的操作比如collect一般算在最后一个stage里面

hadoop spark cluster setup

spark architecture
yarn architecture

伪分布式搭建

主要为了学习spark,用的3.2.1版本,所以简单搭建了伪分布式,官方文档有几个坑,
首先把三台机器的hostname和ip配置下,有些case会hostname访问

1
2
3
192.168.77.130  k8s-master
192.168.77.131 node1
192.168.77.132 node2

这个一般立即生效的

参考官网修改配置文件

https://hadoop.apache.org/docs/r3.1.2/hadoop-project-dist/hadoop-common/SingleCluster.html#Pseudo-Distributed_Operation
关于官网说的这两个文件,etc/hadoop/core-site.xml, etc/hadoop/hdfs-site.xml
core-site.xml比官网多了hadoop.tmp.dir, 同时用k8s-master或者ip比较好,localhost会导致远程连接失败

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-3.1.2/tmp</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://k8s-master:9000</value>
</property>
</configuration>

hdfs-site.xml,比官网多了dfs.datanode.data.dir,dfs.datanode.data.dir

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 <configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/root/hadoop-3.1.2/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/root/hadoop-3.1.2/dfs/data</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

多这个几个配置目录是在启动出错的时候可以方便手动删除目录里的文件,当然提前要创建好这个三个目录

更新启动和停止脚本

sbin/start-dfs.sh, sbin/stop-dfs.sh分别添加如下
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

其实应该有更好的地方,比如dfs-env.sh类似的被这两个脚本call的地方添加一次就够了
比如可以在hadoop-3.1.2/etc/hadoop/hadoop-env.sh统一设置一次

启动

1
2
bin/hdfs namenode -format
sbin/start-dfs.sh

其实还可以启动./sbin/start-all来同时启动hdfs和yarn,当然这里只是搭建了一个假的分布式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@k8s-master hadoop-3.1.2]# sbin/start-
start-all.cmd start-balancer.sh start-dfs.sh start-yarn.cmd
start-all.sh start-dfs.cmd start-secure-dns.sh start-yarn.sh
[root@k8s-master hadoop-3.1.2]# sbin/start-all.sh
Starting namenodes on [k8s-master]
Last login: Thu Jun 13 21:16:03 CST 2019 from 192.168.77.3 on pts/0
Starting datanodes
Last login: Thu Jun 13 21:16:14 CST 2019 on pts/0
Starting secondary namenodes [k8s-master]
Last login: Thu Jun 13 21:16:17 CST 2019 on pts/0
Starting resourcemanager
Last login: Thu Jun 13 21:16:25 CST 2019 on pts/0
Starting nodemanagers
Last login: Thu Jun 13 21:16:30 CST 2019 on pts/0

test

hdfs ui: http://k8s-master:9870
yarn ui: http://k8s-master:8088

1
2
3
bin/hdfs dfs -mkdir /user
bin/hdfs dfs -mkdir /user/xuhang
bin/hdfs dfs -put etc/hadoop/*.xml /user/xuhang

部署mapreduce

https://hadoop.apache.org/docs/r3.1.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
可以参考这个创建一个java的mapreduce然后编译打包,跑yarn上面

start spark cluster

https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually

1
2
3
4
5
在master机器,k8s-master
./sbin/start-master.sh

在其他机器
./sbin/start-slave.sh spark://k8s-master:7077

这里的spark不是运行在yarn上面,只是用了standalone的cluster,待会从hdfs取数据

spark ui

http://k8s-master:8080/
driver, work nodes

http://k8s-master:4040
monitor ui 在job运行过程可以访问,如果想事后访问请参考history log章节

simple spark code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.apache.spark.examples

import org.apache.spark.sql.SparkSession

object SimpleApp {
def main(args: Array[String]) {
val logFile = "hdfs://k8s-master:9000/user/xuhang/core-site.xml" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile)
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}

java version

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.apache.spark.examples;/* SimpleApp.java */
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(args[0]).cache();
logData.show();


FilterFunction<String> f1 = s -> s.contains("a");
FilterFunction<String> f2 = s -> s.contains("b");

Dataset<String> newDs = logData.filter(f1);
System.out.println("start to print new ds");
newDs.show();
System.out.println("stop to print new ds");
long numAs = newDs.count();
long numBs = logData.filter(f2).count();


System.out.println("my Lines with a: " + numAs + ", lines with b: " + numBs);

spark.stop();
System.out.println("hello world");
}
}

saprk history log

1
2
3
4
 mkdir /tmp/spark-events(by default)

./bin/spark-submit --class org.apache.spark.examples.SimpleApp --master spark://192.168.77.130:7077 --deploy-mode client --executor-memory 700M --conf spark.eventLog.enabled=true /root/spark/spark-example-java-1.0.jar hdfs://k8s-master:9000/user/xuhang/core-site.xml
./sbin/start-history-server.sh

http://k8s-master:18080 by default
ref: https://spark.apache.org/docs/latest/monitoring.html

run on other machine(client mode)

client mode可以在任意局域网机器运行
with client mode, you can run the spark submit on any machine, just make sure in the same local network.
for instance, in centostest node,
mkdir /tmp/spark-events
./bin/spark-submit –class org.apache.spark.examples.JavaWordCount –master spark://192.168.77.130:7077 –deploy-mode client –executor-memory 700M –conf spark.eventLog.enabled=true /root/spark/spark-example-java-1.0.jar hdfs://k8s-master:9000/user/xuhang/core-site.xml
http://centostest:18080/ (view history on the machine running the submit binary)

run cluster mode

./bin/spark-submit –class org.apache.spark.examples.JavaWordCount –master spark://192.168.77.130:7077 –deploy-mode cluster –executor-memory 700M –conf spark.eventLog.enabled=true /root/spark/spark-example-java-1.0.jar hdfs://k8s-master:9000/user/xuhang/core-site.xml
上面的命令可能会jar包找不到的错误
注意:standalone 模式的 cluster模式 要把jar 文件传到hdfs上面去,因为driver在集群中的任意一节点执行。

stage

1
2
3
4
5
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();

对于这个程序有两个stage,一个到mapToPair结束,第二个到collect结束,点开可以看到DAG
stage

指定分区数目

spark.default.parallelism
./bin/spark-submit –class org.apache.spark.examples.JavaWordCount –master spark://192.168.77.130:7077 –deploy-mode client –driver-memory=2g –executor-memory 2000M –conf spark.eventLog.enabled=true –conf spark.default.parallelism=4 /root/spark/spark-example-java-1.0.jar hdfs://k8s-master:9000/user/xuhang/mylarge4.txt

Yarn的组件及架构

  • ResourceManager:Global(全局)的进程
  • NodeManager:运行在每个节点上的进程
  • ApplicationMaster:Application-specific(应用级别)的进程
  • Scheduler:是ResourceManager的一个组件
  • Container:节点上一组CPU和内存资源
    Container是Yarn对计算机计算资源的抽象,它其实就是一组CPU和内存资源,所有的应用都会运行在Container中。ApplicationMaster是对运行在Yarn中某个应用的抽象,它其实就是某个类型应用的实例,ApplicationMaster是应用级别的,它的主要功能就是向ResourceManager(全局的)申请计算资源(Containers)并且和NodeManager交互来执行和监控具体的task。Scheduler是ResourceManager专门进行资源管理的一个组件,负责分配NodeManager上的Container资源,NodeManager也会不断发送自己Container使用情况给ResourceManager。
    ResourceManager和NodeManager两个进程主要负责系统管理方面的任务。ResourceManager有一个Scheduler,负责各个集群中应用的资源分配。对于每种类型的每个应用,都会对应一个ApplicationMaster实例,ApplicationMaster通过和ResourceManager沟通获得Container资源来运行具体的job,并跟踪这个job的运行状态、监控运行进度。
    因为application master需要向ResourceManager(全局的)申请计算资源(Containers),他应该是跑在每个node manager上面
    https://mapr.com/docs/60/MapROverview/c_application_master.html
    https://blog.csdn.net/suifeng3051/article/details/49486927(这个文章讲的很好)

总结一下

关于Hadoop本身,有namenode, datanode, resource manager, node manager四个节点,
关于这个4个节点,可以搭建真正的分布式测试下,同时搞起zookeeper防止单点故障
namenode, datanode是hdfs,resource manager, node manager是yarn

note

spark-submit –class com.cjh.test.WordCount –conf spark.default.parallelism=12 –conf spark.executor.memory=800m –conf spark.executor.cores=2 –conf spark.cores.max=6 my.jar
如果要多个参数在命令行,需要多个–conf

spark basic

分区和cpu核心数目

分区数目一般是cpu核心数目的2到4倍
加入有50GB的数据存放在hdfs上面,除以128MB,差不多是160,但是我们的cpu总核心数目才50,
所以160个分区对应50个cores也是可以的,这样一个core差不多要运行3个task

如果cpu核心有160,那么每个分区对应一个task,这样最快

分区数目不能太多,太多了就太多的task,这样节点压力会很大

每一个分区被一个task执行,如果160个分区也就是160个task,分布在50个cpu核心

postgres安装

安装

https://www.postgresql.org/download/linux/redhat/
官方提供的了新建repo然后yum的安装方式,非常的方便,可以指定版本yum install postgresql10

1
2
3
4
5
yum install postgresql10
yum install postgresql10-server
/usr/pgsql-10/bin/postgresql-10-setup initdb
systemctl enable postgresql-10
systemctl start postgresql-10

配置

注意配置文件在/var/lib/pgsql/10/data不在/usr/pgsql-10
可以通过netstat -nlt查看ip的绑定情况,如果修改了/usr/pgsql-10的listen相关的配置,
会发现5432仍然listen在127.0.0.1,而不是远程任意ip地址可以访问
https://blog.bigbinary.com/2016/01/23/configure-postgresql-to-allow-remote-connection.html这个文章价提到了

pg_hba.conf的配置必须是trust,否则远程客户端比如datagrip还是不能登陆

1
2
host    all             all              0.0.0.0/0                       trust
host all all ::/0 trust

同时注意postgres安装的时候会默认安装一个linux的用户postgres,
可以su - postgres之后然后psql命令进入shell的sql界面,
然后修改密码ALTER USER postgres WITH PASSWORD ‘xuhang’,
最后可以客户端用xuhang这个密码登陆

其他ref

https://www.postgresql.org/docs/10/auth-pg-hba-conf.html

总结

直接看下面步骤进行安装for quick,进入root用户
yum install https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm
yum install postgresql10
yum install postgresql10-server
/usr/pgsql-10/bin/postgresql-10-setup initdb
systemctl enable postgresql-10
systemctl start postgresql-10
安装好下面开始配置
su - postgres
psql
ALTER USER postgres WITH PASSWORD ‘xuhang’

append
` host all all 0.0.0.0/0 trust host all all ::/0 trust
to /var/lib/pgsql/10/data/pg_hba.conf

update to listen_addresses = '*'
in postgresql.conf

systemctl restart postgresql-10
可以客户端登陆了,默认数据库postgres,默认用户名postgres,密码是alter修改过的

old Hadoop Architecture

关于yarn之前的hadooop架构请参考下面这个文章
https://www.journaldev.com/8808/hadoop1-architecture-and-how-major-components-works
mapreduce的jobtracker和hdfs的name node是在一起的, mapreduce的tasktracker和hdfs的data node是一起的

yarn架构
https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/
新旧 Hadoop MapReduce 框架比对
让我们来对新旧 MapReduce 框架做详细的分析和对比,可以看到有以下几点显著变化:

首先客户端不变,其调用 API 及接口大部分保持兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大的改变 ( 详见 2.3 Demo 代码开发及详解),但是原框架中核心的 JobTracker 和 TaskTracker 不见了,取而代之的是 ResourceManager, ApplicationMaster 与 NodeManager 三个部分。