hdfs有namenode、secondraynamenode、datanode组成。为n+1模式
先分析宕机后的损失,宕机后直接导致client无法访问,内存中的元数据丢失,但是硬盘中的元数据应该还存在,如果只是节点挂了,重启即可,如果是机器挂了,重启机器后看节点是否能重启,不能重启就要找到原因修复了。但是最终的解决方案应该是在设计集群的初期就考虑到这个问题,做namenode的HA。
namenode对数据的管理采用了三种存储形式:
namenode和secondarynamenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondarynamenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据
(1)局部聚合加全局聚合。
第一次在map阶段对那些导致了数据倾斜的key加上1到n的随机前缀,这样本来相
同的key也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
第二次mapreduce,去掉key的随机前缀,进行全局聚合。
思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第
二次再根据去掉key的随机前缀,按原key进行reduce处理。
这个方法进行两次mapreduce,性能稍差。
(2)增加Reducer,提升并行度
JobConf.setNumReduceTasks(int)
(3)实现自定义分区
根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer
shuffle:洗牌、发牌——(核心机制:缓存,数据分区,排序,Merge进行局部value的合并);
具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;
1)Map方法之后Reduce方法之前这段处理过程叫Shuffle
2)Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段快排;溢写产生大量溢写文件,需要对溢写文件进行归并排序;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。
3)每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。
在进入Reduce方法前,可以对数据进行分组操作。
map的数量由输入切片的数量决定,128M切分一个切片,只要是文件也分为一个切片,有多少个切片就有多少个mapTask。
reduce数量自己配置。
求平均数的时候就不需要用combiner,因为不会减少reduce执行数量。在其他的时候,可以依据情况,使用combiner,来减少map的输出数量,减少拷贝到reduce的文件,从而减轻reduce的压力,节省网络开销,提升执行效率
HDFS数据安全性如何保证?
在写入的时候不会重新重新分配datanode。如果写入时,一个datanode挂掉,会将已经写入的数据放置到queue的顶部,并将挂掉的datanode移出pipline,将数据写入到剩余的datanode,在写入结束后,namenode会收集datanode的信息,发现此文件的replication没有达到配置的要求(default=3),然后寻找一个datanode保存副本。
0)HDFS小文件影响
(1)影响NameNode的寿命,因为文件元数据存储在NameNode的内存中
(2)影响计算引擎的任务数量,比如每个小的文件都会生成一个Map任务
1)数据输入小文件处理:
(1)合并小文件:对小文件进行归档(Har)、自定义Inputformat将小文件存储成SequenceFile文件。
(2)采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。
(3)对于大量小文件Job,可以开启JVM重用。
2)Map阶段
(1)增大环形缓冲区大小。由100m扩大到200m
(2)增大环形缓冲区溢写的比例。由80%扩大到90%
(3)减少对溢写文件的merge次数。(10个文件,一次20个merge)
(4)不影响实际业务的前提下,采用Combiner提前合并,减少I/O。
3)Reduce阶段
(3)规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)增加每个Reduce去Map中拿数据的并行数
(5)集群性能可以的前提下,增大Reduce端存储数据内存的大小。
4)IO传输
(2)使用SequenceFile二进制文件
5)整体
(1)MapTask默认内存大小为1G,可以增加MapTask内存大小为4-5g
(2)ReduceTask默认内存大小为1G,可以增加ReduceTask内存大小为4-5g
(3)可以增加MapTask的cpu核数,增加ReduceTask的CPU核数
(4)增加每个Container的CPU核数和内存大小
(5)调整每个MapTask和ReduceTask最大重试次数
1.NameNode它是hadoop中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有metadate。
3.DataNode它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个datanode守护进程。
4.ResourceManager(JobTracker)JobTracker负责调度DataNode上的工作。每个DataNode有一个TaskTracker,它们执行实际工作。
5.NodeManager(TaskTracker)执行任务
6.DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为ActiveNN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。
7.JournalNode高可用情况下存放namenode的editlog文件.
Job是我们对一个完整的mapreduce程序的抽象封装
Task是job运行时,每一个处理阶段的具体实例,如maptask,reducetask,maptask和reducetask都会有多个并发运行的实例
HDFS高可用原理:
HadoopHA(HighAvailable)通过同时配置两个处于Active/Passive模式的Namenode来解决上述问题,状态分别是Active和Standby.StandbyNamenode作为热备份,从而允许在机器发生故障时能够快速进行故障转移,同时在日常维护的时候使用优雅的方式进行Namenode切换。Namenode只能配置一主一备,不能多于两个Namenode。
主Namenode处理所有的操作请求(读写),而Standby只是作为slave,维护尽可能同步的状态,使得故障时能够快速切换到Standby。为了使StandbyNamenode与ActiveNamenode数据保持同步,两个Namenode都与一组JournalNode进行通信。当主Namenode进行任务的namespace操作时,都会确保持久会修改日志到JournalNode节点中。StandbyNamenode持续监控这些edit,当监测到变化时,将这些修改同步到自己的namespace。
当进行故障转移时,Standby在成为ActiveNamenode之前,会确保自己已经读取了JournalNode中的所有edit日志,从而保持数据状态与故障发生前一致。
为了确保故障转移能够快速完成,StandbyNamenode需要维护最新的Block位置信息,即每个Block副本存放在集群中的哪些节点上。为了达到这一点,Datanode同时配置主备两个Namenode,并同时发送Block报告和心跳到两台Namenode。
确保任何时刻只有一个Namenode处于Active状态非常重要,否则可能出现数据丢失或者数据损坏。当两台Namenode都认为自己的ActiveNamenode时,会同时尝试写入数据(不会再去检测和同步数据)。为了防止这种脑裂现象,JournalNodes只允许一个Namenode写入数据,内部通过维护epoch数来控制,从而安全地进行故障转移。
fsimage:filesystemimage的简写,文件镜像。
客户端修改文件时候,先更新内存中的metadata信息,只有当对文件操作成功的时候,才会写到editlog。
fsimage是文件meta信息的持久化的检查点。secondarynamenode会定期的将fsimage和editlog合并dump成新的fsimage
FIFOScheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。
在Fair(公平)调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
需要注意的是,在下图Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。
mapjoin方案
join因为空值导致长尾(key为空值是用随机值代替)
join因为热点值导致长尾,也可以将热点数据和非热点数据分开处理,最后合并
UDF操作作用于单个数据行,并且产生一个数据行作为输出。大多数函数都属于这一类(比如数学函数和字符串函数)。
UDAF接受多个输入数据行,并产生一个输出数据行。像COUNT和MAX这样的函数就是聚集函数。
UDTF操作作用于单个数据行,并且产生多个数据行-------一个表作为输出。lateralviewexplore()
简单来说:
UDF:返回对应值,一对一
UDAF:返回聚类值,多对一
UDTF:返回拆分值,一对多
内部表:加载数据到hive所在的hdfs目录,删除时,元数据和数据文件都删除
外部表:不加载数据到hive所在的hdfs目录,删除时,只删除表结构。
这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
insertinto:将数据写到表中
overridewrite:覆盖之前的内容。
hive的条件判断(if、coalesce、case)
hive主要是做离线分析的
hive建表有三种方式
hive表有2种:内部表和外部表
union去重
unionoll不去重
1)groupby
注:groupby优于distinctgroup
情形:groupby维度过小,某值的数量过多
后果:处理某值的reduce非常耗时
解决方式:采用sum()groupby的方式来替换count(distinct)完成计算。
2)count(distinct)
count(distinctxx)
情形:某特殊值过多
后果:处理此特殊值的reduce耗时;只有一个reduce任务
解决方式:countdistinct时,将值为空的情况单独处理,比如可以直接过滤空值的行,
在最后结果中加1。如果还有其他计算,需要进行groupby,可以先将值为空的记录单独处
理,再和其他计算结果进行union。
3)mapjoin
4)不同数据类型关联产生数据倾斜
情形:比如用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时。
默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配
到一个Reducer中。
解决方式:把数字类型转换成字符串类型
select*fromusersa
leftouterjoinlogsb
ona.usr_id=cast(b.user_idasstring)
5)开启数据倾斜时负载均衡
sethive.groupby.skewindata=true;
思想:就是先随机分发并处理,再按照keygroupby来分发处理。
操作:当选项设定为true,生成的查询计划会有两个MRJob。
第一个MRJob中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分
聚合操作,并输出结果,这样处理的结果是相同的GroupByKey有可能被分发到不同的
Reduce中,从而达到负载均衡的目的;
第二个MRJob再根据预处理的数据结果按照GroupByKey分布到Reduce中(这个过
程可以保证相同的原始GroupByKey被分布到同一个Reduce中),最后完成最终的聚合操
作。
点评:它使计算变成了两个mapreduce,先在第一个中在shuffle过程partition时随机
给key打标记,使每个key随机均匀分布到各个reduce上计算,但是这样只能完成部分
计算,因为相同key没有分配到相同reduce上。
所以需要第二次的mapreduce,这次就回归正常shuffle,但是数据分布不均匀的问题在第
一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次
mr中随机分布到各个节点完成。
6)控制空值分布
将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多
个Reducer。
注:对于异常值如果不需要的话,最好是提前在where条件里过滤掉,这样可以使计算
量大大减少
1)MapJoin
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换
成CommonJoin,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小
表全部加载到内存在map端进行join,避免reducer处理。
2)行列过滤
列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT*。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那
么就会先全表关联,之后再过滤。
3)列式存储
4)采用分区技术
5)合理设置Map数
(1)通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
(2)是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件
(3)是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只
有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map
任务去做,肯定也比较耗时。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
6)小文件进行合并
在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行
合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
7)合理设置Reduce数
Reduce个数并不是越多越好
(2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那
么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce
数;使单个Reduce任务处理数据量大小要合适;
8)常用参数
//输出合并小文件
SEThive.merge.mapfiles=true;--默认true,在map-only任务结束时合并
小文件
SEThive.merge.mapredfiles=true;--默认false,在map-reduce任务结
束时合并小文件
SEThive.merge.size.per.task=268435456;--默认256M
SEThive.merge.smallfiles.avgsize=16777216;--当输出文件的平均大小
小于16m该值时,启动一个独立的map-reduce任务进行文件merge
9)开启map端combiner(不影响最终业务逻辑)
sethive.map.aggr=true;
10)压缩(选择快的)
设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读
写和网络传输,能提高很多效率)
11)开启JVM重用
delet删除数据
drop删除表
truncate摧毁表结构并重建
hive默认的字段分隔符为ascii码的控制符\001(^A),建表的时候用fieldsterminatedby'\001'
遇到过字段里边有\t的情况,自定义InputFormat,替换为其他分隔符再做后续处理
分区表:原来的一个大表存储的时候分成不同的数据目录进行存储。如果说是单分区表,那么在表的目录下就只有一级子目录,如果说是多分区表,那么在表的目录下有多少分区就有多少级子目录。不管是单分区表,还是多分区表,在表的目录下,和非最终分区目录下是不能直接存储数据文件的
分桶表:原理和hashpartitioner一样,将hive中的一张表的数据进行归纳分类的时候,归纳分类规则就是hashpartitioner。(需要指定分桶字段,指定分成多少桶)
分区表和分桶的区别除了存储的格式不同外,最主要的是作用:
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。Hive0.7之前,需要使用hint提示/*+mapjoin(table)*/才会执行MapJoin,否则执行CommonJoin,但在0.7版本之后,默认自动会转换MapJoin,由参数hive.auto.convert.join来控制,默认为true.假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为MapJoin。
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。减少昂贵的shuffle操作及reduce操作MapJoin分为两个阶段:
每天收集到的ng日志和埋点日志数据,需要做大量的统计数据分析,所以可以使用外部表进行存储,方便数据的共享,并且在对表做操作的时候不会误删原始数据。
在做统计分析时候用到的中间表,结果表可以使用内部表,因为这些数据不需要共享,使用内部表更为合适。并且很多时候分区表我们只需要保留最近3天的数据,用外部表的时候删除分区时无法删除数据
round(DOUBLEa)
floor(DOUBLEa)
ceil(DOUBLEa)
rand()
size(Map
map_keys(Map
map_values(Map
array_contains(Array
sort_array(Array
cast(expras
date_format函数(根据格式整理日期)date_add、date_sub函数(加减日期)next_day函数last_day函数(求当月最后一天日期)collect_set函数get_json_object解析json函数
from_unixtime(bigintunixtime,stringformat)to_date(stringtimestamp)year(stringdate)month(stringdate)hour(stringdate)weekofyear(stringdate)datediff(stringenddate,stringstartdate)add_months(stringstart_date,intnum_months)date_format(date/timestamp/stringts,stringfmt)
if(booleantestCondition,TvalueTrue,TvalueFalseOrNull)
nvl(Tvalue,Tdefault_value)
COALESCE(Tv1,Tv2,...)
CASEaWHENbTHENc[WHENdTHENe]*[ELSEf]END
isnull(a)
isnotnull(a)
concat(string|binaryA,string|binaryB...)
concat_ws(stringSEP,stringA,stringB...)
get_json_object(stringjson_string,stringpath)
length(stringA)
lower(stringA)lcase(stringA)
parse_url(stringurlString,stringpartToExtract[,stringkeyToExtract])
regexp_replace(stringINITIAL_STRING,stringPATTERN,stringREPLACEMENT)
reverse(stringA)
split(stringstr,stringpat)
substr(string|binaryA,intstart)substring(string|binaryA,intstart)
countsumminmaxavg
explode(array
explode(ARRAY)
json_tuple(jsonStr,k1,k2,...)
parse_url_tuple(url,p1,p2,...)
转换(Transformation)现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。
转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join,coalesce等等。
动作(Action)在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。
动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
collect该方法把数据收集到driver端Array数组类型
所有的transformation只有遇到action才能被执行。
当触发执行action之后,数据类型不再是rdd了,数据就会存储到指定文件系统中,或者直接打印结果或者收集起来。
1.集合并行化创建(有数据)
valarr=Array(1,2,3,4,5)
valrdd=sc.parallelize(arr)
valrdd=sc.makeRDD(arr)
2.读取外部文件系统,如hdfs,或者读取本地文件(最常用的方式)(没数据)
valrdd2=sc.textFile("hdfs://hdp-01:9000/words.txt")
//读取本地文件
valrdd2=sc.textFile(“file:///root/words.txt”)
3.从父RDD转换成新的子RDD
调用Transformation类的方法,生成新的RDD
Worker的功能:定时和master通信;调度并管理自身的executor
executor:由Worker启动的,程序最终在executor中运行,(程序运行的一个容器)
spark-submit命令执行时,会根据master地址去向Master发送请求,
Master接收到Dirver端的任务请求之后,根据任务的请求资源进行调度,(打散的策略),尽可能的把任务资源平均分配,然后向WOrker发送指令
Worker收到Master的指令之后,就根据相应的资源,启动executor(cores,memory)
executor会向dirver端建立请求,通知driver,任务已经可以运行了
driver运行任务的时候,会把任务发送到executor中去运行。
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions,shuffle=true)
2)区别:
repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
sortBy既可以作用于RDD[K],还可以作用于RDD[(k,v)]
sortByKey只能作用于RDD[K,V]类型上。
使用foreachPartition
*1,mapmapPartition是转换类的算子,有返回值
*2,写mysql,redis的连接
foreach*100万100万次的连接
foreachPartions*200个分区200次连接一个分区中的数据,共用一个连接
foreachParititon每次迭代一个分区,foreach每次迭代一个元素。
该方法没有返回值,或者Unit
主要作用于,没有返回值类型的操作(打印结果,写入到mysql数据库中)
在写入到redis,mysql的时候,优先使用foreachPartititon
reduceByKey会传一个聚合函数,相当于groupByKey+mapValues
reduceByKey会有一个分区内聚合,而groupByKey没有最核心的区别
结论:reduceByKey有分区内聚合,更高效,优先选择使用reduceByKey。
都是做RDD持久化的
1.缓存,是在触发action之后,把数据写入到内存或者磁盘中。不会截断血缘关系
(设置缓存级别为memory_only:内存不足,只会部分缓存或者没有缓存,缓存会丢失,memory_and_disk:内存不足,会使用磁盘)
2.checkpoint也是在触发action之后,执行任务。单独再启动一个job,负责写入数据到hdfs中。(把rdd中的数据,以二进制文本的方式写入到hdfs中,有几个分区,就有几个二进制文件)
3.某一个RDD被checkpoint之后,他的父依赖关系会被删除,血缘关系被截断,该RDD转换成了CheckPointRDD,以后再对该rdd的所有操作,都是从hdfs中的checkpoint的具体目录来读取数据。缓存之后,rdd的依赖关系还是存在的。
当前文件a.txt的格式,请统计每个单词出现的次数
A,b,c
B,b,f,e
objectWordCount{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")valsc=newSparkContext(conf)varsData:RDD[String]=sc.textFile("a.txt")valsortData:RDD[(String,Int)]=sData.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)sortData.foreach(print)}}16.spark应用程序的执行命令是什么?/usr/local/spark-current2.3/bin/spark-submit\
--classcom.wedoctor.Application\
--masteryarn\
--deploy-modeclient\
--driver-memory1g\
--executor-memory2g\
--queueroot.wedw\
--num-executors200\
--jars/home/pgxl/liuzc/config-1.3.0.jar,/home/pgxl/liuzc/hadoop-lzo-0.4.20.jar,/home/pgxl/liuzc/elasticsearch-hadoop-hive-2.3.4.jar\
/home/pgxl/liuzc/sen.jar
其中,standalone模式,sparkonyarn模式,sparkonmesos模式是集群模式
使用广播变量,每个Executor的内存中,只驻留一份变量副本,而不是对每个task都传输一次大变量,省了很多的网络传输,对性能提升具有很大帮助,而且会通过高效的广播算法来减少传输代价。
valarr=newArrayList[String];这里会报错,需要改成valarr:Array[String]=newArray[String](10)
arr.foreach(println)打印不会报空指针
reduceBykey:
groupByKey:
…ByKey:
1)RDD
优点:
编译时类型安全
编译时就能检查出类型错误
面向对象的编程风格
直接通过类名点的方式来操作数据
缺点:
序列化和反序列化的性能开销
无论是集群间的通信,还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
GC的性能开销,频繁的创建和销毁对象,势必会增加GC
2)DataFrame
DataFrame引入了schema和off-heap
schema:RDD每一行的数据,结构都是一样的,这个结构就存储在schema中。Spark通过schema就能够读懂数据,因此在通信和IO时就只需要序列化和反序列化数据,而结构的部分就可以省略了。
3)DataSet
DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。
当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。
三者之间的转换:
RDD中的数据在数据源,RDD只是一个抽象的数据集,我们通过对RDD的操作就相当于对数据进行操作。
数据在第一执行cache算子时会被加载到各个Executor进程的内存中,第二次就会直接从内存中读取而不会区磁盘。
和Mr一样,但是Spark默认最少有两个分区。
父RDD的一个分区中的数据有可能被分配到子RDD的多个分区中
18619304961,18619304064,186193008,186193009
18619304962,18619304065,186193007,186193008
18619304963,18619304066,186193006,186193010
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
SparkonYarn作业执行流程?
1.SparkYarnClient向Yarn中提交应用程序。2.ResourceManager收到请求后,在集群中选择一个NodeManager,并为该应用程序分配一个Container,在这个Container中启动应用程序的ApplicationMaster,ApplicationMaster进行SparkContext等的初始化。3.ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束。4.ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,并在获得的Container中启动CoarseGrainedExecutorBackend,启动后会向ApplicationMaster中的SparkContext注册并申请Task。5.ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。6.应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。
yarn-client和yarn-cluster有什么区别?
1.理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:ApplicationMaster。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别2.YARN-Cluster模式下,Driver运行在AM(ApplicationMaster)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业3.YARN-Client模式下,ApplicationMaster仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开
SparkflatMap源码:
/***ReturnanewRDDbyfirstapplyingafunctiontoallelementsofthis*RDD,andthenflatteningtheresults.*/defflatMap[U:ClassTag](f:T=>TraversableOnce[U]):RDD[U]=withScope{valcleanF=sc.clean(f)newMapPartitionsRDD[U,T](this,(context,pid,iter)=>iter.flatMap(cleanF))}ScalaflatMap源码:
/**Createsanewiteratorbyapplyingafunctiontoallvaluesproducedbythisiterator*andconcatenatingtheresults.**@paramfthefunctiontoapplyoneachelement.*@returntheiteratorresultingfromapplyingthegiveniterator-valuedfunction*`f`toeachvalueproducedbythisiteratorandconcatenatingtheresults.*@noteReuse:$consumesAndProducesIterator*/defflatMap[B](f:A=>GenTraversableOnce[B]):Iterator[B]=newAbstractIterator[B]{privatevarcur:Iterator[B]=emptyprivatedefnextCur(){cur=f(self.next()).toIterator}defhasNext:Boolean={//Equivalenttocur.hasNext||self.hasNext&&{nextCur();hasNext}//butslightlyshorterbytecode(betterJVMinlining!)while(!cur.hasNext){if(!self.hasNext)returnfalsenextCur()}true}defnext():B=
新的RDD,也可以理解成原本的每个元素由横向执行函数f后再变为纵向。画红部分一直在回调,当RDD内没有元素为止。
本质上kafka只支持Topic;
每个group中可以有多个consumer,每个consumer属于一个consumergroup;
通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;
那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0:producer不会等待broker发送ack
1:当leader接收到消息之后发送ack
-1:当所有的follower都同步消息成功后发送ack
request.required.acks=0
伪命题
每个分区内,每条消息都有一个offset,故只能保证分区内有序。
如果要全局有序的,必须保证生产有序,存储有序,消费有序。
由于生产可以做集群,存储可以分片,消费可以设置为一个consumerGroup,要保证全局有序,就需要保证每个环节都有序。
只有一个可能,就是一个生产者,一个partition,一个消费者。这种场景和大数据应用场景相悖。
Kafka的作用是解耦,如果直接从日志服务器上采集的话,实时离线都要采集,等于要采集两份数据,而使用了kafka的话,只需要从日志服务器上采集一份数据,然后在kafka中使用不同的两个组读取就行了
KafkaManager
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-SyncReplicas)列表,新加入的Follower也会先存放在OSR中。
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
ISR:in-syncreplicaset(ISR),与leader保持同步的follower集合
AR:分区的所有副本
LEO:每个副本的最后条消息的offset
HW:一个分区中所有副本最小的offset
先提交offset,后消费,有可能造成数据的重复
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafkaController负责topic的创建工作,并更新metadatacache
可以增加
bin/kafka-topics.sh--zookeeperlocalhost:2181/kafka--alter--topictopic-config--partitions3
不可以减少,被删除的分区数据难以处理。
__consumer_offsets,保存消费者offset
负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
不能及时与leader同步,暂时踢出ISR,等其追上leader之后再重新加入
在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。
每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。
一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜
HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。
设计原则
1)rowkey长度原则
2)rowkey散列原则
3)rowkey唯一原则
如何设计
1)生成随机数、hash、散列值
2)字符串反转
3)字符串拼接
增强hbase查询数据的功能
减少服务端返回给客户端的数据量
答:宕机分为HMaster宕机和HRegisoner宕机,如果是HRegisoner宕机,HMaster会将其所管理的region重新分布到其他活动的RegionServer上,由于数据和日志都持久在HDFS中,该操作不会导致数据丢失。所以数据的一致性和安全性是有保障的。
如果是HMaster宕机,HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper的MasterElection机制保证总有一个Master运行。即ZooKeeper会保证总会有一个HMaster在对外提供服务。
共同点:1.hbase与hive都是架构在hadoop之上的。都是用hadoop作为底层存储区别:2.Hive是建立在Hadoop之上为了减少MapReducejobs编写工作的批处理系统,HBase是为了支持弥补Hadoop对实时操作的缺陷的项目。3.想象你在操作RMDB数据库,如果是全表扫描,就用Hive+Hadoop,如果是索引访问,就用HBase+Hadoop。4.Hivequery就是MapReducejobs可以从5分钟到数小时不止,HBase是非常高效的,肯定比Hive高效的多。5.Hive本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表纯逻辑。6.hive借用hadoop的MapReduce来完成一些hive中的命令的执行7.hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。8.hbase是列存储。9.hdfs作为底层存储,hdfs是存放文件的系统,而Hbase负责组织文件。10.hive需要用到hdfs存储文件,需要用到MapReduce计算框架。
1/客户端要连接zookeeper,从zk的/hbase节点找到hbase:meta表所在的regionserver(host:port);
2/regionserver扫描hbase:meta中的每个region的起始行健,对比r000001这条数据在那个region的范围内;
3/从对应的info:serverkey中存储了region是有哪个regionserver(host:port)在负责的;
4/客户端直接请求对应的regionserver;
5/regionserver接收到客户端发来的请求之后,就会将数据写入到region中
1/首先Client连接zookeeper,找到hbase:meta表所在的regionserver;
2/请求对应的regionserver,扫描hbase:meta表,根据namespace、表名和rowkey在meta表中找到r00001所在的region是由那个regionserver负责的;
3/找到这个region对应的regionserver
4/regionserver收到了请求之后,扫描对应的region返回数据到Client
(先从MemStore找数据,如果没有,再到BlockCache里面读;BlockCache还没有,再到StoreFile上读(为了读取的效率);
如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。)
1)当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;
2)并将数据存储到HDFS中;
3)在HLog中做标记点。
Hmaster
1、管理用户对Table的增、删、改、查操作;
2、记录region在哪台Hregionserver上
3、在RegionSplit后,负责新Region的分配;
4、新机器加入时,管理HRegionServer的负载均衡,调整Region分布
5、在HRegionServer宕机后,负责失效HRegionServer上的Regions迁移。
Hgionserver
HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBASE中最核心的模块。
HRegionServer管理了很多table的分区,也就是region。
HBase有多个RegionServer,每个RegionServer里有多个Region,一个Region中存放着若干行的行键以及所对应的数据,一个列族是一个文件夹,如果经常要搜索整个一条数据,列族越少越好,如果只有一部分的数据需要经常被搜索,那么将经常搜索的建立一个列族,其他不常搜索的建立列族检索较快。
(1)加盐这里所说的加盐不是密码学中的加盐,而是在rowkey的前面增加随机数,具体就是给rowkey分配一个随机前缀以使得它和之前的rowkey的开头不同。给多少个前缀?这个数量应该和我们想要分散数据到不同的region的数量一致(类似hive里面的分桶)。(自己理解:即region数量是一个范围,我们给rowkey分配一个随机数,前缀(随机数)的范围是region的数量)加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。
(2)哈希哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据。
(3)反转第三种防止热点的方法是反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。反转rowkey的例子:以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,从而避免诸如139、158之类的固定号码开头导致的热点问题。
(6)其他办法列族名的长度尽可能小,最好是只有一个字符。冗长的属性名虽然可读性好,但是更短的属性名存储在HBase中会更好。也可以在建表时预估数据规模,预留region数量,例如create‘myspace:mytable’,SPLITS=>[01,02,03,,…99]
维表数据一般根据ods层数据加工生成,在设计宽表的时候,可以适当的用一些维度退化手法,将维度退化到事实表中,减少事实表和维表的关联
每个公司都会有点差别
ODS
ods.库名_表名_df/di/da/dz
CDM(dwd/dws)
dwd.主题_内容_df
1.数据量比较大
2.表中的部分字段会被更新
4.更新的比例和频率不是很大如果表中信息变化不是很大,每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费
业务库数据源:mysql,oracle,mongo
日志数据:ng日志,埋点日志
爬虫数据
ng日志表,三端(app,web,h5)中app端日志量最大,清洗入库后的数据一天大概xxxxW
没用到阿里那一套,数据平台为自研产品
airflow,azkaban,ooize,我们公司使用的是airflow
业务数据用的是datax
日志数据用的是logstash
logstash-->kafka-->logstash-->hdfs
根据表数据量及表特性,选择用全量表,增量表,追加表和拉链表处理
可以
改造过
1.直接存入mysql业务库,业务方直接读取
2.数据存入mysql,以接口的形式提供数据
3.数据存入kylin,需求方通过jdbc读取数据
拉链表:
快照表:按天分区,每一天的数据都是截止到那一天mysql的全量数据
不落,是内存计算
定期checkpoint存储opratorstate及keyedstate到stateBackend
开启checkpoint可以容错,程序自动重启的时候可以从checkpoint中恢复数据
1.开启checkpoint
2.source支持数据重发
3.sink支持事务,可以分2次提交,如kafka;或者sink支持幂等,可以覆盖之前写入的数据,如redis
满足以上三点,可以保证Exactly_Once
Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向YarnResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
集合是将所有数据加载到内存,然后通过集合的方法去内存中获取,而迭代器是一个对象,实现了Iterator接口,实现了接口的hasNext和Next方法。
1)线程安全性不同
HashMap是线程不安全的,HashTable是线程安全的,其中的方法是Synchronize的,
在多线程并发的情况下,可以直接使用HashTabl,但是使用HashMap时必须自己增加同步
处理。
2)是否提供contains方法
HashMap只有containsValue和containsKey方法;HashTable有contains、containsKey
和containsValue三个方法,其中contains和containsValue方法功能相同。
3)key和value是否允许null值
Hashtable中,key和value都不允许出现null值。HashMap中,null可以作为键,这
样的键只有一个;可以有一个或多个键所对应的值为null。
4)数组初始化和扩容机制
HashTable在不指定容量的情况下的默认容量为11,而HashMap为16,Hashtable不
要求底层数组的容量一定要为2的整数次幂,而HashMap则要求一定为2的整数次幂。
Hashtable扩容时,将容量变为原来的2倍加1,而HashMap扩容时,将容量变为原
来的2倍。
线程池分为单线程线程池,固定大小线程池,可缓冲的线程池
TreeMap会自动进行排序,根据key的Compare方法进行排序
HashSet是采用hash表来实现的。其中的元素没有按顺序排列,add()、remove()以及
contains()等方法都是复杂度为O(1)的方法。
TreeSet是采用树结构实现(红黑树算法)。元素是按顺序进行排列,但是add()、
remove()以及contains()等方法都是复杂度为O(log(n))的方法。它还提供了一些方法来处理
排序的set,如first(),last(),headSet(),tailSet()等等。
1、StringBuffer与StringBuilder中的方法和功能完全是等价的。
2、只是StringBuffer中的方法大都采用了synchronized关键字进行修饰,因此是线程
安全的,而StringBuilder没有这个修饰,可以被认为是线程不安全的。
3、在单线程程序下,StringBuilder效率更快,因为它不需要加锁,不具备多线程安全
而StringBuffer则每次都需要判断锁,效率相对更低
final:修饰符(关键字)有三种用法:修饰类、变量和方法。修饰类时,意味着它不
能再派生出新的子类,即不能被继承,因此它和abstract是反义词。修饰变量时,该变量
方法时,也同样只能使用,不能在子类中被重写。
finally:通常放在try…catch的后面构造最终执行代码块,这就意味着程序无论正常执
行还是发生异常,这里的代码只要JVM不关闭都能执行,可以将释放外部资源的代码写在
finally块中。
finalize:Object类中定义的方法,Java中允许使用finalize()方法在垃圾收集器将对象
从内存中清除出去之前做必要的清理工作。这个方法是由垃圾收集器在销毁对象时调用
的,通过重写finalize()方法可以整理系统资源或者执行其他清理工作。
==:如果比较的是基本数据类型,那么比较的是变量的值
如果比较的是引用数据类型,那么比较的是地址值(两个对象是否指向同一块内
存)
equals:如果没重写equals方法比较的是两个对象的地址值。
如果重写了equals方法后我们往往比较的是对象中的属性的内容
equals方法是从Object类中继承的,默认的实现就是使用==
Java类加载需要经历一下几个过程:
加载时类加载的第一个过程,在这个阶段,将完成一下三件事情:
验证的目的是为了确保Class文件的字节流中的信息不回危害到虚拟机.在该阶段主要完成以下四钟验证:
准备阶段是为类的静态变量分配内存并将其初始化为默认值,这些内存都将在方法区中进行分配。准备阶段不分配类中的实例变量的内存,实例变量将会在对象实例化时随着对象一起分配在Java堆中。
该阶段主要完成符号引用到直接引用的转换动作。解析动作并不一定在初始化动作完成之前,也有可能在初始化之后。
初始化时类加载的最后一步,前面的类加载过程,除了在加载阶段用户应用程序可以通过自定义类加载器参与之外,其余动作完全由虚拟机主导和控制。到了初始化阶段,才真正开始执行类中定义的Java程序代码。
判断一个对象是否存活有两种方法:
a.Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
b.Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中
c.一般的采集需求,通过对flume的简单配置即可实现
d.ume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景
只有Map阶段,没有Reduce阶段的任务。
/opt/module/sqoop/bin/sqoopimport\
--connect\
--username\
--password\
--target-dir\
--delete-target-dir\
--num-mappers\
--fields-terminated-by\
--query"$2"'and$CONDITIONS;'
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。
1)场景1:如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。
2)场景2:设置map数量为1个(不推荐,面试官想要的答案不只这个)
多个Map任务时,采用–staging-table方式,仍然可以解决数据一致性问题。
1)缓存穿透是指查询一个一定不存在的数据。由于缓存命不中时会去查询数据库,查不到
数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,造成缓存穿
透。
解决方案:
②采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定
不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力
就会造成缓存雪崩。
3)缓存击穿,是指一个key非常热点,在不停的扛着大并发,当这个key在失效的瞬间,
持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
可以设置key永不过期
1)RDB持久化:
②服务shutdown会自动持久化
③输入bgsave也会持久化
2)AOF:以日志形式记录每个更新操作
Redis重新启动时读取这个文件,重新执行新建、修改数据的命令恢复数据。
保存策略:
推荐(并且也是默认)的措施为每秒持久化一次,这种策略可以兼顾速度和安全性。
缺点:
1比起RDB占用更多的磁盘空间
2恢复备份速度要慢
3每次读写都同步的话,有一定的性能压力
4存在个别Bug,造成恢复不能
选择策略:
官方推荐:
string
字符串
list
可以重复的集合
set
不可以重复的集合
hash
类似于Map
zset(sortedset)
带分数的set
如果对数据不敏感,可以选单独用RDB;不建议单独用AOF,因为可能出现Bug;如果只是做纯内存缓存,可以都不用
乐观锁:执行操作前假设当前操作不会被打断(乐观)。基于这个假设,我们在做操作前不会锁定资源,万一发生了其他操作的干扰,那么本次操作将被放弃。Redis使用的就是乐观锁。
1)完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。
2)数据结构简单,对数据操作也简单,Redis中的数据结构是专门进行设计的
3)采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗
4)使用多路I/O复用模型,非阻塞IO
5)使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,
mysql-h192.168.1.130-uuser-p123456-P3306-Dwemeta_test
B树
B树不管叶子节点还是非叶子节点,都会保存数据,这样导致在非叶子节点中能保存的指针数量变少(有些资料也称为扇出)
指针少的情况下要保存大量数据,只能增加树的高度,导致IO操作变多,查询性能变低;
B+树
1.单一节点存储更多的元素,使得查询的IO次数更少。2.所有查询都要查找到叶子节点,查询性能稳定。3.所有叶子节点形成有序链表,便于范围查询,远远高于B-树
B树的每个结点包含着结点的值和结点所处的位置
packagecom.wedoctor;importjava.util.Arrays;publicclassMaxSum{publicstaticintfindMax(intarr[]){if(arr.length==1){returnarr[0];}intmid=(arr.length)/2;int[]leftArr=Arrays.copyOfRange(arr,0,mid);int[]rightArr=Arrays.copyOfRange(arr,mid,arr.length);intlenLeft=findMax(leftArr);intlenRight=findMax(rightArr);intlenMid=maxInMid(leftArr,rightArr);intmax=Math.max(Math.max(lenLeft,lenRight),lenMid);returnmax;}publicstaticintmaxInMid(intleft[],intright[]){intmaxLeft=0;intmaxRight=0;inttmpLeft=0;inttmpRight=0;for(inti=0;i