问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

mapreduce怎么读取增量的hdfs文件

发布网友 发布时间:2022-05-01 12:03

我来回答

1个回答

热心网友 时间:2023-10-11 04:19

package com.fora;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.Mapper.Context;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class FileOperate {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
init();/*初始化文件*/ 
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(FileOperate.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/* set the path of input and output*/ 
FileInputFormat.addInputPath(job, new Path("hdfs:///copyOftest.c"));
FileOutputFormat.setOutputPath(job, new Path("hdfs:///wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void rece(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void init()throws IOException {
/*copy local file to hdfs*/ 
Configuration config = new Configuration();
FileSystem hdfs = null;
String srcFile = "/test.c";
String dstFile = "hdfs:///copyOftest.c";
System.out.print("copy success!\n");
hdfs = FileSystem.get(config);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
String fileName = "hdfs:///copyOftest.c";
Path path = new Path(fileName);
FileStatus fileStatus =null;
fileStatus = hdfs.getFileStatus(path);
System.out.println(fileStatus.getBlockSize());
FileSystem fs = FileSystem.get(config);
DistributedFileSystem hdfs1 = (DistributedFileSystem) fs;
DatanodeInfo[] dataNodeStats = hdfs1.getDataNodeStats();
/*create a file on hdfs*/ 
Path Outputpath = new Path("hdfs:///output/listOfDatanode");
FSDataOutputStream outputStream = hdfs.create(Outputpath);
String[] names = new String[dataNodeStats.length];
for (int i = 0; i < dataNodeStats.length; i++) {
names[i] = dataNodeStats[i].getHostName();/*get the list of datanodes*/ 
System.out.println(names[i]);
/*write the list of datanodes to file on hdfs*/ 
outputStream.write(names[i].getBytes(), 0, names[i].length());
}
}
}

热心网友 时间:2023-10-11 04:19

package com.fora;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.maprece.Job;
import org.apache.hadoop.maprece.Mapper;
import org.apache.hadoop.maprece.Recer;
import org.apache.hadoop.maprece.Mapper.Context;
import org.apache.hadoop.maprece.lib.input.FileInputFormat;
import org.apache.hadoop.maprece.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class FileOperate {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
init();/*初始化文件*/ 
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(FileOperate.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumRecer.class);
job.setRecerClass(IntSumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/* set the path of input and output*/ 
FileInputFormat.addInputPath(job, new Path("hdfs:///copyOftest.c"));
FileOutputFormat.setOutputPath(job, new Path("hdfs:///wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumRecer
extends Recer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void rece(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void init()throws IOException {
/*copy local file to hdfs*/ 
Configuration config = new Configuration();
FileSystem hdfs = null;
String srcFile = "/test.c";
String dstFile = "hdfs:///copyOftest.c";
System.out.print("copy success!\n");
hdfs = FileSystem.get(config);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
String fileName = "hdfs:///copyOftest.c";
Path path = new Path(fileName);
FileStatus fileStatus =null;
fileStatus = hdfs.getFileStatus(path);
System.out.println(fileStatus.getBlockSize());
FileSystem fs = FileSystem.get(config);
DistributedFileSystem hdfs1 = (DistributedFileSystem) fs;
DatanodeInfo[] dataNodeStats = hdfs1.getDataNodeStats();
/*create a file on hdfs*/ 
Path Outputpath = new Path("hdfs:///output/listOfDatanode");
FSDataOutputStream outputStream = hdfs.create(Outputpath);
String[] names = new String[dataNodeStats.length];
for (int i = 0; i < dataNodeStats.length; i++) {
names[i] = dataNodeStats[i].getHostName();/*get the list of datanodes*/ 
System.out.println(names[i]);
/*write the list of datanodes to file on hdfs*/ 
outputStream.write(names[i].getBytes(), 0, names[i].length());
}
}
}

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
请大家看看电脑配置怎么样,值多少钱,玩大型网游,页游会不会卡 帮忙看一下旧台式电脑能卖多少钱,谢谢 高考文科考了590,排名3455,福建的,想去外省的本一,能报哪里,急啊= = 华侨大学云南省高考分数为522的能从商学院转建筑学院吗? 厦门华侨大学录取 高考志愿填报应该注意哪些问题 成熟男人戴手表好看吗,成熟男人戴什么手表? 金铲铲之战金色神将阵容怎么搭-金铲铲之战金色神将阵容搭配攻略 金铲铲之战赌蜘蛛阵容推荐 蛛后主C装备搭配攻略 金铲铲之战仙灵蜘蛛阵容搭配推荐 从hdfs读取excel文件内容返回的数据为空 Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的 2022年2月1日是星期二玲玲的生日是3月6日她的生日那天是星期几? 编写一个JAVA类方法,通过该方法可以获取出存储在HDFS集群中根目录的所有... 2000个3除7余几 一个数除以七商33余数最大这个数是多少?列试 1、33……33(2006个3)除以7,余数是多少?2、77……77(100个7)被3整除后,求余数 如何从hdfs缓存中直接读取数据 老式楼房,厨房窗户在楼道,如何安装燃气热水器有什么好办法 求333333....33(2000个3)除以7的余数是多少? (今天就要答案) ()除以7等于33余(),被除数最大是几 电脑开机光驱响 怎么解决 33333......(2000个)除7余几 笔记本光驱响 330÷7等于几? 为什么笔记本电脑在开机时光驱那里总要响一下? 33除7等于多少? 电脑开机光驱一直在响屏幕是黑的怎么回事?? 33除以7的小数部分上前50个数位上的数字之和是多少? 电脑开机时,光驱老是响一下就不响了,而且还开不开机 如何让Hadoop读取以gz结尾的文本格式的文件 手机游戏是从华为应用商店下载的,那么怎么能在电脑上玩手游呢?神雕侠侣2 我在华为应用市场手机上面下载了手游游戏,怎么才能在电脑上面玩? 捷信逾期两年了还能商量还钱吗? 貔貅吊坠断成两半寓意什么? 黑曜石貔貅从中间断成两半寓意着什么? 你们看貔貅被摔成两截,寓意着什么。 不小心把掉地上从两个貔貅中间摔成两半貔貅摔坏了有什么寓意 刚买的黑曜石貔貅手链破碎了寓意着什么? 黑曜石的貔貅碎了有什么寓意? 怎样让鱼缸长出藻类? 怎样让鱼缸里长水藻? 貔貅的寓意是什么呀? 我的apple id 已被停用,若要启用请在appleid apple.com&#47;zh-CN重设密码 鱼缸或水池里面如何让水藻怎么长的快? 怎么样才能让鱼缸长绿藻 如何让锦鲤鱼缸中绿藻长得慢 如何连接网络电视 NUBWO/狼博旺 魔轮耳机怎么样,好用吗 NUBWO狼博旺N1耳机怎么样