001/* 002 * #%L 003 * Netarchivesuite - common 004 * %% 005 * Copyright (C) 2005 - 2014 The Royal Danish Library, the Danish State and University Library, 006 * the National Library of France and the Austrian National Library. 007 * %% 008 * This program is free software: you can redistribute it and/or modify 009 * it under the terms of the GNU Lesser General Public License as 010 * published by the Free Software Foundation, either version 2.1 of the 011 * License, or (at your option) any later version. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Lesser Public License for more details. 017 * 018 * You should have received a copy of the GNU General Lesser Public 019 * License along with this program. If not, see 020 * <http://www.gnu.org/licenses/lgpl-2.1.html>. 021 * #L% 022 */ 023 024package dk.netarkivet.common.distribute; 025 026import static dk.netarkivet.common.CommonSettings.FTP_DATATIMEOUT_SETTINGS; 027import static dk.netarkivet.common.CommonSettings.FTP_RETRIES_SETTINGS; 028import static dk.netarkivet.common.CommonSettings.FTP_SERVER_NAME; 029import static dk.netarkivet.common.CommonSettings.FTP_SERVER_PORT; 030import static dk.netarkivet.common.CommonSettings.FTP_USER_NAME; 031import static dk.netarkivet.common.CommonSettings.FTP_USER_PASSWORD; 032 033import java.io.File; 034import java.io.FileOutputStream; 035import java.io.FilterInputStream; 036import java.io.IOException; 037import java.io.InputStream; 038import java.io.OutputStream; 039import java.util.UUID; 040 041import org.apache.commons.io.IOUtils; 042import org.apache.commons.net.io.CopyStreamException; 043import org.archive.io.ArchiveRecord; 044import org.archive.io.arc.ARCRecord; 045import org.archive.io.warc.WARCRecord; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import dk.netarkivet.common.exceptions.ArgumentNotValid; 050import dk.netarkivet.common.exceptions.IOFailure; 051import dk.netarkivet.common.utils.FileUtils; 052import dk.netarkivet.common.utils.NotificationType; 053import dk.netarkivet.common.utils.NotificationsFactory; 054import dk.netarkivet.common.utils.Settings; 055 056/** 057 * This class extends the functionality of FTPRemoteFile by allowing local input to be taken from an ArchiveRecord. It 058 * has factory methods which return an instance of FTPRemoteFile when a File is used as input so that behavior is 059 * effectively delegated to that class when required. 060 */ 061@SuppressWarnings({"serial"}) 062public class ExtendedFTPRemoteFile implements RemoteFile { 063 064 /** A named logger for this class. */ 065 private static final transient Logger log = LoggerFactory.getLogger(ExtendedFTPRemoteFile.class); 066 067 /** The record to be read from the archive. */ 068 private transient ArchiveRecord record; 069 070 /** 071 * The name to be used for the original record. ArchiveRecords do not necessarily possess natural names so a guid is 072 * used. For arcfiles, this is not guaranteed to be the same across multiple fetches of the same record 073 */ 074 private String name; 075 076 077 /** How many times we will retry upload, download, and logon. */ 078 private static final transient int FTP_RETRIES = Settings.getInt(FTP_RETRIES_SETTINGS); 079 080 /** How large a data timeout on our FTP connections. */ 081 private static final transient int FTP_DATATIMEOUT = Settings.getInt(FTP_DATATIMEOUT_SETTINGS); 082 083 /** The default place in classpath where the settings file can be found. */ 084 private static final String DEFAULT_SETTINGS_CLASSPATH = "dk/netarkivet/common/distribute/FTPRemoteFileSettings.xml"; 085 086 /* 087 * The static initialiser is called when the class is loaded. It will add default values for all settings defined in 088 * this class, by loading them from a settings.xml file in classpath. 089 */ 090 static { 091 Settings.addDefaultClasspathSettings(DEFAULT_SETTINGS_CLASSPATH); 092 } 093 094 /** The name that we use for the file on the FTP server. */ 095 private final String ftpFileName; 096 097 private FTPConnectionManager connectionManager; 098 099 /** 100 * Create an instance of this class connected to an ARC or WARC record. Unfortunately the reflection we use to find 101 * the factory method cannot find this method directly because the runtime-class of the parameter is not 102 * ArchiveRecord. Therefore we also define the two specific overloaded factory methods for ARCRecords and 103 * WARCRecord. 104 * 105 * @param record the record 106 * @return the instance 107 */ 108 public static RemoteFile getInstance(ArchiveRecord record) { 109 return new ExtendedFTPRemoteFile(record); 110 } 111 112 /** 113 * Create an instance of this class connected to an ARCRecord. 114 * 115 * @param record the record 116 * @return the instance 117 */ 118 public static RemoteFile getInstance(ARCRecord record) { 119 return getInstance((ArchiveRecord) record); 120 } 121 122 /** 123 * Create an instance of this class connected to a WARCRecord. 124 * 125 * @param record the record 126 * @return the instance 127 */ 128 public static RemoteFile getInstance(WARCRecord record) { 129 return getInstance((ArchiveRecord) record); 130 } 131 132 /** 133 * This method returns an instance of FTPRemoteFile using the factory method with the same signature in that class. 134 * 135 * @param localFile File object for the remote file 136 * @param useChecksums If true, checksums will be used to check transfers. 137 * @param fileDeletable If true, this file will be deleted after upload to FTP. 138 * @param multipleDownloads If true, the file will not be removed from FTP server automatically after first 139 * download. 140 * @return FTPRemoteFile object 141 * @throws IOFailure if FTPRemoteFile creation fails 142 */ 143 public static RemoteFile getInstance(File localFile, Boolean useChecksums, Boolean fileDeletable, 144 Boolean multipleDownloads) throws IOFailure { 145 ArgumentNotValid.checkNotNull(localFile, "File remoteFile"); 146 return FTPRemoteFile.getInstance(localFile, useChecksums, fileDeletable, multipleDownloads, null); 147 } 148 149 /** 150 * This method returns an instance of FTPRemoteFile using the factory method with the same signature in that class. 151 * 152 * @param localFile File object for the remote file 153 * @param useChecksums If true, checksums will be used to check transfers. 154 * @param fileDeletable If true, this file will be deleted after upload to FTP. 155 * @param multipleDownloads If true, the file will not be removed from FTP server automatically after first 156 * download. 157 * @return FTPRemoteFile object 158 * @throws IOFailure if FTPRemoteFile creation fails 159 */ 160 public static RemoteFile getInstance(File localFile, Boolean useChecksums, Boolean fileDeletable, 161 Boolean multipleDownloads, RemoteFileSettings connectionParams) throws IOFailure { 162 ArgumentNotValid.checkNotNull(localFile, "File remoteFile"); 163 return FTPRemoteFile.getInstance(localFile, useChecksums, fileDeletable, multipleDownloads, connectionParams); 164 } 165 166 @Override 167 public void copyTo(File destFile) { 168 ArgumentNotValid.checkNotNull(destFile, "File destFile"); 169 destFile = destFile.getAbsoluteFile(); 170 if ((!destFile.isFile() || !destFile.canWrite()) 171 && (!destFile.getParentFile().isDirectory() || !destFile.getParentFile().canWrite())) { 172 throw new ArgumentNotValid("Destfile '" + destFile + "' does not point to a writable file for " 173 + "remote file '" + toString() + "'"); 174 } 175 if (log.isDebugEnabled()) { 176 log.debug("Writing {} to {}", toString(), destFile.getAbsolutePath()); 177 } 178 FileOutputStream fos = null; 179 try { 180 fos = new FileOutputStream(destFile); 181 appendTo(fos); 182 } catch (Exception e) { 183 FileUtils.remove(destFile); 184 throw new IOFailure("IO trouble transferring file", e); 185 } finally { 186 IOUtils.closeQuietly(fos); 187 } 188 } 189 190 @Override 191 public void appendTo(OutputStream out) { 192 ArgumentNotValid.checkNotNull(out, "OutputStream out"); 193 connectionManager.logOn(); 194 try { 195 if (!connectionManager.getFTPClient().retrieveFile(ftpFileName, out)) { 196 final String msg = "Append operation from '" + ftpFileName + "' failed: " + connectionManager.getFtpErrorMessage(); 197 log.warn(msg); 198 throw new IOFailure(msg); 199 } 200 out.flush(); 201 } catch (IOException e) { 202 String msg = "Append operation from '" + ftpFileName + "' failed "; 203 if (e instanceof CopyStreamException) { 204 CopyStreamException realException = (CopyStreamException) e; 205 msg += "(real cause = " + realException.getIOException() + ")"; 206 } 207 log.warn(msg, e); 208 throw new IOFailure(msg, e); 209 } finally { 210 connectionManager.logOut(); 211 cleanup(); 212 } 213 } 214 215 @Override 216 public InputStream getInputStream() { 217 connectionManager.logOn(); 218 try { 219 InputStream in = connectionManager.getFTPClient().retrieveFileStream(ftpFileName); 220 return new FilterInputStream(in) { 221 public void close() throws IOException { 222 try { 223 super.close(); 224 } finally { 225 connectionManager.logOut(); 226 cleanup(); 227 } 228 } 229 }; 230 } catch (IOException e) { 231 String msg = "Creating inputstream from '" + ftpFileName + "' failed "; 232 if (e instanceof CopyStreamException) { 233 CopyStreamException realException = (CopyStreamException) e; 234 msg += "(real cause = " + realException.getIOException() + ")"; 235 } 236 log.warn(msg, e); 237 throw new IOFailure(msg, e); 238 } 239 } 240 241 @Override 242 public String getName() { 243 return name; 244 } 245 246 /** 247 * Checksums are not available in this implementation. Returns null. 248 * 249 * @return null 250 */ 251 @Override 252 public String getChecksum() { 253 return null; 254 } 255 256 /** 257 * The cleanup to be effected is deletion of the intermediate file from the ftp server. 258 */ 259 @Override 260 public void cleanup() { 261 log.debug("Deleting file '{}' from ftp server", ftpFileName); 262 try { 263 connectionManager.logOn(); 264 connectionManager.getFTPClient().deleteFile(ftpFileName); 265 } catch (Exception e) { 266 log.warn("Error while deleting ftp file '{}' for file '{}'", ftpFileName, name, e); 267 } finally { 268 // try to disconnect before returning from method 269 try { 270 connectionManager.logOut(); 271 } catch (Exception e) { 272 log.warn("Unexpected error while logging out ", e); 273 } 274 } 275 log.debug("File '{}' deleted from ftp server. Cleanup finished.", ftpFileName); 276 } 277 278 /** 279 * For an ARCRecord, this is the length of the record as defined in the header. For a WARCRecods, this is the 280 * payload length, defined as the difference between the total record length and the size of the header. 281 * 282 * @return the length of the record content in bytes. 283 */ 284 @Override 285 public long getSize() { 286 if (record instanceof ARCRecord) { 287 return record.getHeader().getLength(); 288 } else if (record instanceof WARCRecord) { 289 // The length of the payload of the warc-record is not getLength(), 290 // but getLength minus getContentBegin(), which is the number of 291 // bytes used for the record-header! 292 return record.getHeader().getLength() - record.getHeader().getContentBegin(); 293 } else { 294 throw new ArgumentNotValid("Unknown type of ArchiveRecord: " + record.getClass()); 295 } 296 } 297 298 /** 299 * Creates a RemoteFile instance by uploading the content of the given record to a file on the ftp server. 300 * 301 * @param record The record to be copied. 302 */ 303 private ExtendedFTPRemoteFile(ArchiveRecord record) { 304 this.record = record; 305 this.name = UUID.randomUUID().toString(); 306 this.ftpFileName = this.name; 307 if (log.isDebugEnabled()) { 308 log.debug("Created {} with name {}", this.getClass().getName(), toString()); 309 } 310 311 this.connectionManager = new FTPConnectionManager( 312 Settings.get(FTP_USER_NAME), 313 Settings.get(FTP_USER_PASSWORD), 314 Settings.get(FTP_SERVER_NAME), 315 Settings.getInt(FTP_SERVER_PORT), 316 Settings.getInt(FTP_RETRIES_SETTINGS), 317 Settings.getInt(FTP_DATATIMEOUT_SETTINGS)); 318 319 connectionManager.logOn(); 320 boolean success = false; 321 int tried = 0; 322 String message = null; 323 while (!success && tried < FTP_RETRIES) { 324 tried++; 325 try { 326 success = connectionManager.getFTPClient().storeFile(ftpFileName, record); 327 if (!success) { 328 log.debug("FTP store failed attempt '{}' of " + FTP_RETRIES + ": {}", tried, connectionManager.getFtpErrorMessage()); 329 } 330 } catch (IOException e) { 331 message = "Write operation to '" + ftpFileName + "' failed on attempt " + tried + " of " 332 + FTP_RETRIES; 333 if (e instanceof CopyStreamException) { 334 CopyStreamException realException = (CopyStreamException) e; 335 message += "(real cause = " + realException.getIOException() + ")"; 336 } 337 log.debug(message, e); 338 } 339 } 340 if (!success) { 341 final String msg = "Failed to upload '" + name + "' after " + tried + " attempts. Reason for last failure: " + message; 342 log.warn(msg); 343 // Send an Notification because of this 344 NotificationsFactory.getInstance().notify(msg, NotificationType.ERROR); 345 throw new IOFailure(msg); 346 } 347 log.debug("Completed writing the file '{}'", ftpFileName); 348 try { 349 if (record != null) { 350 record.close(); 351 } 352 } catch (IOException e) { 353 // not a serious bug 354 log.warn("Problem closing inputstream: ", e); 355 } 356 connectionManager.logOut(); 357 log.debug("Ftp logout"); 358 } 359 360 /** 361 * A human readable description of the object which should be sufficient to identify and track it. 362 * 363 * @return description of this object. 364 */ 365 public String toString() { 366 return record.getHeader().getRecordIdentifier() + "_" + record.getHeader().getOffset() + "_" + "(" + name + ")"; 367 } 368}