1. java.使用FileReader字符流统计一篇英文中的单词,要求如下
我已经给你写好了,你要求的功能都能实现,以下是源代码:
packageregular;
importjava.io.BufferedReader;
importjava.io.BufferedWriter;
importjava.io.File;
importjava.io.FileNotFoundException;
importjava.io.FileReader;
importjava.io.FileWriter;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.Collections;
importjava.util.Comparator;
importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importjava.util.Map.Entry;
importjava.util.regex.Pattern;
/**
*统计一篇英文中的单词,要求如下:
*①一共出现了多少个单词;②有多少个互不相同的单词;③给出每个单词出现的频率,并将这些单词按频率大小顺序输出到文件words.txt文件中。
**/
publicclassWordStatistics{
=null;
=null;
publicstaticvoidmain(String[]args){
WordStatisticswordStatistics=newWordStatistics();
Map<String,Integer>word_map=wordStatistics.readFile();
// for(Map.Entry<String,Integer>mapping:word_map.entrySet()){
// System.out.println(mapping.getKey()+":"+mapping.getValue());
// }
wordStatistics.sortAndWrite(word_map);
}
/**
*从指定路径读取英文文章,并形成Map集合
**/
publicMap<String,Integer>readFile(){
//读文件
StringBufferstringBuffer=newStringBuffer();
try{
bufferedReader=newBufferedReader(newFileReader(newFile("F:\text1.txt")));//文件路径可自定义
Stringline="";
while((line=bufferedReader.readLine())!=null)
stringBuffer.append(line);
bufferedReader.close();
}catch(FileNotFoundExceptione){
e.printStackTrace();
}catch(IOExceptione){
e.printStackTrace();
}
//生成<单词,次数>键值对
Patternpattern=Pattern.compile("(\.)?");
String[]words=pattern.split(stringBuffer.toString());
Map<String,Integer>word_map=newHashMap<String,Integer>();
for(Strings:words){
if(!word_map.containsKey(s)){
word_map.put(s,1);
}
else{
intcount=word_map.get(s);
word_map.replace(s,count,count+1);
}
}
returnword_map;
}
/**
*按单词的出现频率排序并输出到words.txt文件中
**/
publicvoidsortAndWrite(Map<String,Integer>word_map){
//排序
List<Map.Entry<String,Integer>>list=newArrayList<Map.Entry<String,Integer>>(word_map.entrySet());
Collections.sort(list,newComparator<Map.Entry<String,Integer>>(){
publicintcompare(Entry<String,Integer>o1,
Entry<String,Integer>o2){
//TODOAuto-generatedmethodstub
returno1.getValue().compareTo(o2.getValue());
}
});
//写入文件
try{
bufferedWriter=newBufferedWriter(newFileWriter(newFile("F:\words.txt")));
bufferedWriter.write("一共出现了"+word_map.size()+"个单词,每个单词和它出现的频率分别是:");
bufferedWriter.flush();
bufferedWriter.newLine();
for(Map.Entry<String,Integer>mapping:list){
bufferedWriter.write(mapping.getKey()+":"+mapping.getValue());
bufferedWriter.flush();
bufferedWriter.newLine();
}
bufferedWriter.close();
System.out.println("WorkOut");
}catch(IOExceptione){
e.printStackTrace();
}
}
}
测试用例:
She had been shopping with her Mom in Wal-Mart. She must have been 6 years old, this beautiful brown haired, freckle-faced image of innocence. It was pouring outside. The kind of rain that gushes over the top of rain gutters, so much in a hurry to hit the Earth, it has no time to flow down the spout.
输出:
(PS:这可全部是原创手写的,望采纳)
2. jstorm 核心
生成Topology
IRichSpout
IRichSpout 为最简单的Spout接口
其中注意:
=>spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个=>task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
=>open是当task起来后执行的初始化动作
=>close是当task被shutdown后执行的动作
=>activate 是当task被激活时,触发的动作
=>deactivate 是task被deactive时,触发的动作
=>nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
=>ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
=>fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
=>declareOutputFields, 定义spout发送数据,每个字段的含义
=>getComponentConfiguration 获取本spout的component 配置
Bolt
其中注意:
=>bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
=>prepare是当task起来后执行的初始化动作
=>cleanup是当task被shutdown后执行的动作
=>execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
=>declareOutputFields, 定义bolt发送数据,每个字段的含义
=>getComponentConfiguration 获取本bolt的component 配置
打包
提交jar
xxxx.jar 为打包后的jar
com.alibaba.xxxx.xx 为入口类,即提交任务的类
parameter即为提交参数
Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树(因为一个tuple通过spout发出了,经过每一个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。
Acker跟踪算法的原理:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。
要实现ack机制:
阿里自己的Jstorm会提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
这样更合理一些, 可以直接取得系统cache的msg values
ack机制即,spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
在规定的时间内(默认是30秒),没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。
l或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
注意,我开始以为如果继承BaseBasicBolt那么程序抛出异常,也会让spout进行重发,但是我错了,程序直接异常停止了
这里我以分布式程序入门案例worldcount为例子吧。
有人问到Storm 是怎么处理重复的tuple?
因为Storm 要保证tuple 的可靠处理,当tuple 处理失败或者超时的时候,spout 会fail并重新发送该tuple,那么就会有tuple 重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。不过也有一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后
续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用 MySQL 、MemCached 或者 Redis 根据逻辑主键来去重。
(3)使用bloom filter 做过滤,简单高效。
在学习storm的过程中,有不少人对storm的Spout组件中的ack及fail相关的问题存在困惑,这里做一个简要的概述。
Storm保证每一个数据都得到有效处理,这是如何保证的呢?正是ack及fail机制确保数据都得到处理的保证,但是storm只是提供给我们一个接口,而具体的方法得由我们自己来实现。例如在spout下一个拓扑节点的bolt上,我们定义某种情况下为数据处理失败,则调用fail,则我们可以在fail方法中进行数据重发,这样就保证了数据都得到了处理。其实,通过读storm的源码,里面有讲到,有些类(BaseBasicBolt?)是会自动调用ack和fail的,不需要我们程序员去ack和fail,但是其他Bolt就没有这种功能了。
3. wso2-ei 编译
在众多规则引擎中,找到了 siddhi 能处理流式数据,支持 etl 等功能,但其为基础的单机版本,除非启动多个实例,网上有 siddhi-storm 的版本,将 siddhi 作为 storm 中的 Bolt 进行执行。 wso2 公司自己的实现也是依赖 storm 实现分布式实时流处理。
在 wso2 众多产品中 wso2-ei (proct-ie) 是依赖 siddhi 实现的 carbon-event-processing 来执行所有流式规则,如下以源码的方式进行编译:
注 根据日志提示即可访问 https://10.1.2.3:9443/carbon/ 直接进行访问,默认用户为 admin / admin
WSO2 ESB 允许系统管理员和SOA架构师轻松的配置消息路由, 虚拟化, 中介, 转换, 日志记录, 任务调度, 负载均衡, 失败路由, 事件中介等等. 运行时被设计为完全异步, 非阻塞 、连续的。基于 Apache Synapse 中介引擎,Apache Synapse是使用Apache Axis2创建的。
各种数据通过 receiving 获取数据,然后通过 SiddhiSpout 接收数据,发射到后端 siddhiBolt 中逐步执行,并完成所有的规则:
4. 想转行到大数据开发需要学习哪些技术
如果要学习大数据,不管你是零基础,还是有一定的基础,都是要懂至少一种计算机编程语言,因为大数据的开发离不开编程语言,不仅要懂,还要精通!但这门编程语言不一定是java。
比如说,如果你主攻Hadoop开发方向,是一定要学习java的,因为Hadoop是由java来开发的。
如果你想要主攻spark方向,是要学习Scala语言的,每个方向要求的编程语言是不同的。
如果你是想要走数据分析方向,那你就要从python编程语言下手,这个也是看自己未来的需求的。
大数据是需要一定的编程基础的,但具体学习哪一门编程,自己可以选择的。其实只要学会了一门编程语言,其他编程语言也是不在话下的。
5. 怎样学习大数据
首先我们要了解Java语言和Linux操作系统,这两个是学习大数据的基础,学习的顺序不分前后。
Java :只要了解一些基础即可,做大数据不需要很深的Java 技术,学java SE 就相当于有学习大数据基础。
Linux:因为大数据相关软件都是在Linux上运行的,所以Linux要学习的扎实一些,学好Linux对你快速掌握大数据相关技术会有很大的帮助,能让你更好的理解hadoop、hive、hbase、spark等大数据软件的运行环境和网络环境配置,能少踩很多坑,学会shell就能看懂脚本这样能更容易理解和配置大数据集群。还能让你对以后新出的大数据技术学习起来更快。
Hadoop:这是现在流行的大数据处理平台几乎已经成为大数据的代名词,所以这个是必学的。Hadoop里面包括几个组件HDFS、MapRece和YARN,HDFS是存储数据的地方就像我们电脑的硬盘一样文件都存储在这个上面,MapRece是对数据进行处理计算的,它有个特点就是不管多大的数据只要给它时间它就能把数据跑完,但是时间可能不是很快所以它叫数据的批处理。
Zookeeper:这是个万金油,安装Hadoop的HA的时候就会用到它,以后的Hbase也会用到它。它一般用来存放一些相互协作的信息,这些信息比较小一般不会超过1M,都是使用它的软件对它有依赖,对于我们个人来讲只需要把它安装正确,让它正常的run起来就可以了。
Mysql:我们学习完大数据的处理了,接下来学习学习小数据的处理工具mysql数据库,因为一会装hive的时候要用到,mysql需要掌握到什么层度那?你能在Linux上把它安装好,运行起来,会配置简单的权限,修改root的密码,创建数据库。这里主要的是学习SQL的语法,因为hive的语法和这个非常相似。
Sqoop:这个是用于把Mysql里的数据导入到Hadoop里的。当然你也可以不用这个,直接把Mysql数据表导出成文件再放到HDFS上也是一样的,当然生产环境中使用要注意Mysql的压力。
Hive:这个东西对于会SQL语法的来说就是神器,它能让你处理大数据变的很简单,不会再费劲的编写MapRece程序。有的人说Pig那?它和Pig差不多掌握一个就可以了。
Oozie:既然学会Hive了,我相信你一定需要这个东西,它可以帮你管理你的Hive或者MapRece、Spark脚本,还能检查你的程序是否执行正确,出错了给你发报警并能帮你重试程序,最重要的是还能帮你配置任务的依赖关系。我相信你一定会喜欢上它的,不然你看着那一大堆脚本,和密密麻麻的crond是不是有种想屎的感觉。
Hbase:这是Hadoop生态体系中的NOSQL数据库,他的数据是按照key和value的形式存储的并且key是唯一的,所以它能用来做数据的排重,它与MYSQL相比能存储的数据量大很多。所以他常被用于大数据处理完成之后的存储目的地。
Kafka:这是个比较好用的队列工具,队列是干吗的?排队买票你知道不?数据多了同样也需要排队处理,这样与你协作的其它同学不会叫起来,你干吗给我这么多的数据(比如好几百G的文件)我怎么处理得过来,你别怪他因为他不是搞大数据的,你可以跟他讲我把数据放在队列里你使用的时候一个个拿,这样他就不在抱怨了马上灰流流的去优化他的程序去了,因为处理不过来就是他的事情。而不是你给的问题。当然我们也可以利用这个工具来做线上实时数据的入库或入HDFS,这时你可以与一个叫Flume的工具配合使用,它是专门用来提供对数据进行简单处理,并写到各种数据接受方(比如Kafka)的。
Spark:它是用来弥补基于MapRece处理数据速度上的缺点,它的特点是把数据装载到内存中计算而不是去读慢的要死进化还特别慢的硬盘。特别适合做迭代运算,所以算法流们特别稀饭它。它是用scala编写的。Java语言或者Scala都可以操作它,因为它们都是用JVM的。
6. 大数据培训需要多长时间难不难学
一般大数据的学习方式有两种:
线下脱产学习,线上视频教学。如果是0基础学员参加线下脱产班学习的话,大多数培训机构都是6个月左右的周期。
大数据的学习有一定难度,对于0基础的小白来说,一定要细心、耐心,认真听课,多多练习。大数据的薪资待遇是比较可观的,目前大数据开发招聘还是以技术为主,大数据需要学习hadoop、spark、storm、超大集群调优、机器学习、并发编程等,加米谷的具体如下:
Java,大数据基础:Linux基础、Maven基础
HDFS分布式文件系统
MapRece分布式计算模型+Yarn分布式资源管理器+Zookeeper分布式协调服务
Hbase分布式数据 库+Hive分布式数据仓库
FlumeNG分布式数据采集系统+Sqoop大数据迁移系统
Scala大数据黄金语言+kafka分布式总线系统
SparkCore大数据计算基石+SparkSQL数据挖掘利器+SparkStreaming流式计算平台
SparkMllib机器学习平台+SparkGraphx图计算平台
大数据项目实战
7. storm基本概念
流式计算中,各个中间件产品对计算过程中的角色的抽象都不尽相同,实现方式也是千差万别。本文针对storm中间件在进行流式计算中的几个概念做个概括总结。
storm分布式计算结构称为topology(拓扑)由stream,spout,bolt组成。
spout代表一个storm拓扑中的数据入口,连接到数据源,将数据转化为一个个tuple,并发射tuple
stream是由无限制个tuple组成的序列。tuple为storm的核心数据结构,是包含了一个或多个键值对的列表。
bolt可以理解为计算程序中的运算或者函数,bolt的上游是输入流,经过bolt实施运算后,可输出一个或者多个输出流。
bolt可以订阅多个由spout或者其他bolt发射的数据流,用以构建复杂的数据流转换网络。
上述即为storm最基本的组成元素,无论storm如何运行,都是以stream,spout,bolt做为最基本的运行单元。而这三者则是共同构成了一个storm拓扑topology。
首先需要明确一个概念,bolt,spout实例,都属于任务,spout产生数据流,并发射,bolt消费数据流,进行计算,并进行落地或再发射,他们的存在以及运行过程都需要消耗资源,而storm集群是一个提供了资源的集群,我们要做的就是将spout/boult实例合理分配到storm集群提供的计算资源上,这样就可以让spout/bolt得以执行。
worker为JVM进程,一个topology会分配到一个或者多个worker上运行。
executor是worker内的java线程,是具体执行bolt/spout实例用的。下篇文章在介绍如何提供storm并行计算能力时会介绍worker以及executor的配置。
在storm中,worker是由supervisor进程创建,并进行监控的。storm集群遵循主从模式,主为nimbus,从为supervisor,storm集群由一个主节点(确实有单点问题),和多个工作节点(supervisor)组成,并使用zookeeper来协调集群中的状态信息,比如任务分配情况,worker状态,supervisor的拓扑度量。
通过配置可指定supervisor上可运行多少worker。一个worker代表一个slot。
nimbus守护进程的主要职责是管理,协调和监控在集群上运行的topology.包括topology的发布,任务指派,事件处理失败时重新指派任务。
supervisor守护进程等待nimbus分配任务后生成并监控workers执行任务。supervosior和worker都是运行在不同的JVM进程上。
了解了集群模式下,storm大致的分布概念,下面结合笔者做的一个实例,了解一下如何发布计算资源到storm集群上。
笔者定义了一个spout,两个bolt 运算过程如下:
其中streamMaking是一个不断生成随机数(5~30)的spout实例,Step1Bolt会过滤掉15以下的随机数(过滤),15以上的随机数会乘以16(计算),再将结果向后发射。Step2Bolt订阅Step1Bolt发射的数据,接收数据后,打印输出。流程结束。
笔者在定义spout/bolt实例时,配置了spout,bolt的并行执行数。其中
streamMaking:4 Step1Bolt:2 Step2Bolt 1
这样,发布成功后,storm会根据我的配置,分配足够的计算资源给予spout/bolt进行执行。
发布:
发布时,spout和bolt都是在一起以jar的形式发布到nimbus上的,分配后,内部定义的spout和bolt将以组件的形式被nimbus分配至worker进程中执行。
其中worker都是由supervisor创建的,创建出来的worker进程与supervisor是分开的不同进程。一个supervisor可创建多少worker可通过修改storm安装目录下的storm.yaml进行配置。
task是执行的最小单元。spout/bolt实例在定义中指定了,要起多少task,以及多少executor。也即一个topology发布之前已经定义了task总量,和需要多少资源来执行我的task总量。nimbus将根据已有的计算资源进行分配。
下图中: nimbus左边代表着计算任务量,和所需计算配置
nimbus右边代表着计算资源
nimbus将根据计算资源信息,合理的分发计算任务量。
发布成功后,通过storm自带的UI功能,可以查看你发布的topology运行以及其中每个组件的分布执行情况。
监控图像中清晰的显示了,目前部署的topology,以及topology中每个组件所分配的计算资源所在host,以及每个组件发射了多少tuple,接收了多少tuple,以及有多少个executor在并行执行。
本文讲述了storm内的基本元素以及基本概念,后续将讲述storm的重点配置信息,以及如何提高并发计算能力,窗口概念等高级特性,后续会进行源码分析,以及与其他实时计算中间件的比较。
8. 大数据行业有哪些工作机会,招聘的岗位技能有哪些
大数据主要有以下职位: 1)数据分析师Data analyst:指熟悉相关业务,熟练搭建数据分析框架,掌握和使用相关的分析常用工具和基本的分析方法,进行数据搜集、整理、分析,针对数据分析结论给管理销售运营提供指导意义的分析意见。