001import java.io.File;
002import java.io.IOException;
003import java.nio.file.Files;
004import java.nio.file.StandardOpenOption;
005import java.security.PrivilegedExceptionAction;
006import java.util.ArrayList;
007import java.util.List;
008import java.util.UUID;
009import java.util.regex.Pattern;
010
011import org.apache.hadoop.conf.Configuration;
012import org.apache.hadoop.fs.FileSystem;
013import org.apache.hadoop.fs.Path;
014import org.apache.hadoop.security.UserGroupInformation;
015import org.apache.hadoop.util.ToolRunner;
016import org.slf4j.Logger;
017import org.slf4j.LoggerFactory;
018
019import dk.netarkivet.common.CommonSettings;
020import dk.netarkivet.common.utils.Settings;
021import dk.netarkivet.common.utils.hadoop.GetMetadataMapper;
022import dk.netarkivet.common.utils.hadoop.HadoopFileUtils;
023import dk.netarkivet.common.utils.hadoop.HadoopJob;
024import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy;
025import dk.netarkivet.common.utils.hadoop.HadoopJobTool;
026import dk.netarkivet.common.utils.hadoop.HadoopJobUtils;
027import dk.netarkivet.common.utils.hadoop.MetadataExtractionStrategy;
028import sun.security.krb5.KrbException;
029
030public class MetadataIndexingApplication {
031    private static final Logger log = LoggerFactory.getLogger(MetadataIndexingApplication.class);
032
033    private static void usage() {
034        System.out.println("Usage: java MetadataIndexingApplication <inputFile>");
035    }
036
037    /**
038     * Start a hadoop job that fetches seeds reports out of metadata files. The single input argument
039     * is a path to the input file in the local file system. The input file is a newline-separated
040     * list of metadata paths to be processed. The lines input paths may be any combination of "file://"
041     * and "hdfs://" URIs.
042     *
043     * @param args
044     * @throws IOException
045     * @throws InterruptedException
046     */
047    public static void main(String[] args) throws Exception {
048
049        String localInputFileString = args[0];
050        if (localInputFileString == null || localInputFileString.length() == 0) {
051            usage();
052            System.exit(1);
053        }
054        File localInputFile = new File(localInputFileString);
055        if (!localInputFile.exists() && localInputFile.isFile()) {
056            System.out.println("No such file " + localInputFile.getAbsolutePath());
057            usage();
058            System.exit(2);
059        }
060
061        HadoopJobUtils.doKerberosLogin();
062        Configuration conf = HadoopJobUtils.getConf();
063        conf.setPattern(GetMetadataMapper.URL_PATTERN,  Pattern.compile("metadata://[^/]*/crawl/reports/seeds-report.txt.*"));
064        conf.setPattern(GetMetadataMapper.MIME_PATTERN,  Pattern.compile(".*"));
065
066        try (FileSystem fileSystem = FileSystem.newInstance(conf)) {
067            long id = 0L;
068            HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem);
069            HadoopJob job = new HadoopJob(id, jobStrategy);
070            UUID uuid = UUID.randomUUID();
071            Path jobInputFile = jobStrategy.createJobInputFile(uuid);
072            job.setJobInputFile(jobInputFile);
073            log.info("Putting local input file in hdfs at " + jobInputFile);
074            fileSystem.copyFromLocalFile(false, new Path(localInputFile.getAbsolutePath()),
075                    jobInputFile);
076            Path jobOutputDir = jobStrategy.createJobOutputDir(uuid);
077            job.setJobOutputDir(jobOutputDir);
078            ToolRunner.run(new HadoopJobTool(conf, new GetMetadataMapper()),
079                    new String[] {jobInputFile.toString(), jobOutputDir.toString()});
080        }
081    }
082}