001/*
002 * #%L
003 * Netarchivesuite - common
004 * %%
005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library,
006 *             the National Library of France and the Austrian National Library.
007 * %%
008 * This program is free software: you can redistribute it and/or modify
009 * it under the terms of the GNU Lesser General Public License as
010 * published by the Free Software Foundation, either version 2.1 of the
011 * License, or (at your option) any later version.
012 * 
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Lesser Public License for more details.
017 * 
018 * You should have received a copy of the GNU General Lesser Public
019 * License along with this program.  If not, see
020 * <http://www.gnu.org/licenses/lgpl-2.1.html>.
021 * #L%
022 */
023
024package dk.netarkivet.common.distribute;
025
026import static dk.netarkivet.common.CommonSettings.FTP_DATATIMEOUT_SETTINGS;
027import static dk.netarkivet.common.CommonSettings.FTP_RETRIES_SETTINGS;
028import static dk.netarkivet.common.CommonSettings.FTP_SERVER_NAME;
029import static dk.netarkivet.common.CommonSettings.FTP_SERVER_PORT;
030import static dk.netarkivet.common.CommonSettings.FTP_USER_NAME;
031import static dk.netarkivet.common.CommonSettings.FTP_USER_PASSWORD;
032
033import java.io.File;
034import java.io.FileOutputStream;
035import java.io.FilterInputStream;
036import java.io.IOException;
037import java.io.InputStream;
038import java.io.OutputStream;
039import java.util.UUID;
040
041import org.apache.commons.io.IOUtils;
042import org.apache.commons.net.io.CopyStreamException;
043import org.archive.io.ArchiveRecord;
044import org.archive.io.arc.ARCRecord;
045import org.archive.io.warc.WARCRecord;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import dk.netarkivet.common.exceptions.ArgumentNotValid;
050import dk.netarkivet.common.exceptions.IOFailure;
051import dk.netarkivet.common.utils.FileUtils;
052import dk.netarkivet.common.utils.NotificationType;
053import dk.netarkivet.common.utils.NotificationsFactory;
054import dk.netarkivet.common.utils.Settings;
055
056/**
057 * This class extends the functionality of FTPRemoteFile by allowing local input to be taken from an ArchiveRecord. It
058 * has factory methods which return an instance of FTPRemoteFile when a File is used as input so that behavior is
059 * effectively delegated to that class when required.
060 */
061@SuppressWarnings({"serial"})
062public class ExtendedFTPRemoteFile implements RemoteFile {
063
064    /** A named logger for this class. */
065    private static final transient Logger log = LoggerFactory.getLogger(ExtendedFTPRemoteFile.class);
066
067    /** The record to be read from the archive. */
068    private transient ArchiveRecord record;
069
070    /**
071     * The name to be used for the original record. ArchiveRecords do not necessarily possess natural names so a guid is
072     * used. For arcfiles, this is not guaranteed to be the same across multiple fetches of the same record
073     */
074    private String name;
075
076    
077    /** How many times we will retry upload, download, and logon. */
078    private static final transient int FTP_RETRIES = Settings.getInt(FTP_RETRIES_SETTINGS);
079
080    /** How large a data timeout on our FTP connections. */
081    private static final transient int FTP_DATATIMEOUT = Settings.getInt(FTP_DATATIMEOUT_SETTINGS);
082
083    /** The default place in classpath where the settings file can be found. */
084    private static final String DEFAULT_SETTINGS_CLASSPATH = "dk/netarkivet/common/distribute/FTPRemoteFileSettings.xml";
085
086    /*
087     * The static initialiser is called when the class is loaded. It will add default values for all settings defined in
088     * this class, by loading them from a settings.xml file in classpath.
089     */
090    static {
091        Settings.addDefaultClasspathSettings(DEFAULT_SETTINGS_CLASSPATH);
092    }
093    
094    /** The name that we use for the file on the FTP server. */
095    private final String ftpFileName;
096
097        private FTPConnectionManager connectionManager;
098
099    /**
100     * Create an instance of this class connected to an ARC or WARC record. Unfortunately the reflection we use to find
101     * the factory method cannot find this method directly because the runtime-class of the parameter is not
102     * ArchiveRecord. Therefore we also define the two specific overloaded factory methods for ARCRecords and
103     * WARCRecord.
104     *
105     * @param record the record
106     * @return the instance
107     */
108    public static RemoteFile getInstance(ArchiveRecord record) {
109        return new ExtendedFTPRemoteFile(record);
110    }
111
112    /**
113     * Create an instance of this class connected to an ARCRecord.
114     *
115     * @param record the record
116     * @return the instance
117     */
118    public static RemoteFile getInstance(ARCRecord record) {
119        return getInstance((ArchiveRecord) record);
120    }
121
122    /**
123     * Create an instance of this class connected to a WARCRecord.
124     *
125     * @param record the record
126     * @return the instance
127     */
128    public static RemoteFile getInstance(WARCRecord record) {
129        return getInstance((ArchiveRecord) record);
130    }
131
132    /**
133     * This method returns an instance of FTPRemoteFile using the factory method with the same signature in that class.
134     *
135     * @param localFile File object for the remote file
136     * @param useChecksums If true, checksums will be used to check transfers.
137     * @param fileDeletable If true, this file will be deleted after upload to FTP.
138     * @param multipleDownloads If true, the file will not be removed from FTP server automatically after first
139     * download.
140     * @return FTPRemoteFile object
141     * @throws IOFailure if FTPRemoteFile creation fails
142     */
143    public static RemoteFile getInstance(File localFile, Boolean useChecksums, Boolean fileDeletable,
144            Boolean multipleDownloads) throws IOFailure {
145        ArgumentNotValid.checkNotNull(localFile, "File remoteFile");
146        return FTPRemoteFile.getInstance(localFile, useChecksums, fileDeletable, multipleDownloads, null);
147    }
148
149    /**
150     * This method returns an instance of FTPRemoteFile using the factory method with the same signature in that class.
151     *
152     * @param localFile File object for the remote file
153     * @param useChecksums If true, checksums will be used to check transfers.
154     * @param fileDeletable If true, this file will be deleted after upload to FTP.
155     * @param multipleDownloads If true, the file will not be removed from FTP server automatically after first
156     * download.
157     * @return FTPRemoteFile object
158     * @throws IOFailure if FTPRemoteFile creation fails
159     */
160    public static RemoteFile getInstance(File localFile, Boolean useChecksums, Boolean fileDeletable,
161            Boolean multipleDownloads, RemoteFileSettings connectionParams) throws IOFailure {
162        ArgumentNotValid.checkNotNull(localFile, "File remoteFile");
163        return FTPRemoteFile.getInstance(localFile, useChecksums, fileDeletable, multipleDownloads, connectionParams);
164    }
165
166    @Override
167    public void copyTo(File destFile) {
168        ArgumentNotValid.checkNotNull(destFile, "File destFile");
169        destFile = destFile.getAbsoluteFile();
170        if ((!destFile.isFile() || !destFile.canWrite())
171                && (!destFile.getParentFile().isDirectory() || !destFile.getParentFile().canWrite())) {
172            throw new ArgumentNotValid("Destfile '" + destFile + "' does not point to a writable file for "
173                    + "remote file '" + toString() + "'");
174        }
175        if (log.isDebugEnabled()) {
176            log.debug("Writing {} to {}", toString(), destFile.getAbsolutePath());
177        }
178        FileOutputStream fos = null;
179        try {
180            fos = new FileOutputStream(destFile);
181            appendTo(fos);
182        } catch (Exception e) {
183            FileUtils.remove(destFile);
184            throw new IOFailure("IO trouble transferring file", e);
185        } finally {
186            IOUtils.closeQuietly(fos);
187        }
188    }
189
190    @Override
191    public void appendTo(OutputStream out) {
192        ArgumentNotValid.checkNotNull(out, "OutputStream out");
193        connectionManager.logOn();
194        try {
195            if (!connectionManager.getFTPClient().retrieveFile(ftpFileName, out)) {
196                final String msg = "Append operation from '" + ftpFileName + "' failed: " + connectionManager.getFtpErrorMessage();
197                log.warn(msg);
198                throw new IOFailure(msg);
199            }
200            out.flush();
201        } catch (IOException e) {
202            String msg = "Append operation from '" + ftpFileName + "' failed ";
203            if (e instanceof CopyStreamException) {
204                CopyStreamException realException = (CopyStreamException) e;
205                msg += "(real cause = " + realException.getIOException() + ")";
206            }
207            log.warn(msg, e);
208            throw new IOFailure(msg, e);
209        } finally {
210                connectionManager.logOut();
211            cleanup();
212        }
213    }
214
215    @Override
216    public InputStream getInputStream() {
217        connectionManager.logOn();
218        try {
219            InputStream in = connectionManager.getFTPClient().retrieveFileStream(ftpFileName);
220            return new FilterInputStream(in) {
221                public void close() throws IOException {
222                    try {
223                        super.close();
224                    } finally {
225                        connectionManager.logOut();
226                        cleanup();
227                    }
228                }
229            };
230        } catch (IOException e) {
231            String msg = "Creating inputstream from '" + ftpFileName + "' failed ";
232            if (e instanceof CopyStreamException) {
233                CopyStreamException realException = (CopyStreamException) e;
234                msg += "(real cause = " + realException.getIOException() + ")";
235            }
236            log.warn(msg, e);
237            throw new IOFailure(msg, e);
238        }
239    }
240
241    @Override
242    public String getName() {
243        return name;
244    }
245
246    /**
247     * Checksums are not available in this implementation. Returns null.
248     *
249     * @return null
250     */
251    @Override
252    public String getChecksum() {
253        return null;
254    }
255
256    /**
257     * The cleanup to be effected is deletion of the intermediate file from the ftp server.
258     */
259    @Override
260    public void cleanup() {
261        log.debug("Deleting file '{}' from ftp server", ftpFileName);
262        try {
263                connectionManager.logOn();
264                connectionManager.getFTPClient().deleteFile(ftpFileName);
265        } catch (Exception e) {
266            log.warn("Error while deleting ftp file '{}' for file '{}'", ftpFileName, name, e);
267        } finally {
268            // try to disconnect before returning from method
269            try {
270                connectionManager.logOut();
271            } catch (Exception e) {
272                log.warn("Unexpected error while logging out ", e);
273            }
274        }
275        log.debug("File '{}' deleted from ftp server. Cleanup finished.", ftpFileName);
276    }
277
278    /**
279     * For an ARCRecord, this is the length of the record as defined in the header. For a WARCRecods, this is the
280     * payload length, defined as the difference between the total record length and the size of the header.
281     *
282     * @return the length of the record content in bytes.
283     */
284    @Override
285    public long getSize() {
286        if (record instanceof ARCRecord) {
287            return record.getHeader().getLength();
288        } else if (record instanceof WARCRecord) {
289            // The length of the payload of the warc-record is not getLength(),
290            // but getLength minus getContentBegin(), which is the number of
291            // bytes used for the record-header!
292            return record.getHeader().getLength() - record.getHeader().getContentBegin();
293        } else {
294            throw new ArgumentNotValid("Unknown type of ArchiveRecord: " + record.getClass());
295        }
296    }
297
298    /**
299     * Creates a RemoteFile instance by uploading the content of the given record to a file on the ftp server.
300     *
301     * @param record The record to be copied.
302     */
303    private ExtendedFTPRemoteFile(ArchiveRecord record) {
304        this.record = record;
305        this.name = UUID.randomUUID().toString();
306        this.ftpFileName = this.name;
307        if (log.isDebugEnabled()) {
308            log.debug("Created {} with name {}", this.getClass().getName(), toString());
309        }
310
311        this.connectionManager = new FTPConnectionManager(
312                        Settings.get(FTP_USER_NAME), 
313                        Settings.get(FTP_USER_PASSWORD), 
314                        Settings.get(FTP_SERVER_NAME), 
315                        Settings.getInt(FTP_SERVER_PORT), 
316                        Settings.getInt(FTP_RETRIES_SETTINGS), 
317                        Settings.getInt(FTP_DATATIMEOUT_SETTINGS));
318        
319        connectionManager.logOn();
320        boolean success = false;
321        int tried = 0;
322        String message = null;
323        while (!success && tried < FTP_RETRIES) {
324            tried++;
325            try {
326                success = connectionManager.getFTPClient().storeFile(ftpFileName, record);
327                if (!success) {
328                    log.debug("FTP store failed attempt '{}' of " + FTP_RETRIES + ": {}", tried, connectionManager.getFtpErrorMessage());
329                }
330            } catch (IOException e) {
331                message = "Write operation to '" + ftpFileName + "' failed on attempt " + tried + " of "
332                        + FTP_RETRIES;
333                if (e instanceof CopyStreamException) {
334                    CopyStreamException realException = (CopyStreamException) e;
335                    message += "(real cause = " + realException.getIOException() + ")";
336                }
337                log.debug(message, e);
338            }
339        }
340        if (!success) {
341            final String msg = "Failed to upload '" + name + "' after " + tried + " attempts. Reason for last failure: " +  message;
342            log.warn(msg);
343            // Send an Notification because of this
344            NotificationsFactory.getInstance().notify(msg, NotificationType.ERROR);
345            throw new IOFailure(msg);
346        }
347        log.debug("Completed writing the file '{}'", ftpFileName);
348        try {
349            if (record != null) {
350                record.close();
351            }
352        } catch (IOException e) {
353            // not a serious bug
354            log.warn("Problem closing inputstream: ", e);
355        }
356        connectionManager.logOut();
357        log.debug("Ftp logout");
358    }
359
360    /**
361     * A human readable description of the object which should be sufficient to identify and track it.
362     *
363     * @return description of this object.
364     */
365    public String toString() {
366        return record.getHeader().getRecordIdentifier() + "_" + record.getHeader().getOffset() + "_" + "(" + name + ")";
367    }
368}