Unverified Commit f9494404 authored by Niels Erik's avatar Niels Erik Committed by GitHub
Browse files

Binary marc read and encode issues ris 152 etc (#61)

* Fixes for issues with binary MARC - RIS-152

  - In case of a MarcException, attempt to continue reading
    stream if continue-on-errors is on.
  - Use MarcStreamReader instead of MarcPermissiveStreamReader
    for iterating MARC file.
  - Inject MarcPermissiveStreamReader in the process for reading
    AND converting single records to UTF-8.
  - Always keep the original MARC as a byte array (since that is
    created always anyway as input for the permissive reader.
  - Remove option for non-buffering input stream since the code as
    it's written will always apply buffering anyway.

* Downgrade CompletePendingCommand timeout error log to warn log
parent 23de2976
Pipeline #9770 failed with stages
in 0 seconds
......@@ -51,19 +51,15 @@ public class FtpInputStream extends InputStream {
@Override
public void close() throws IOException {
long elapsed = elapsed();
if (elapsed>ASSUMED_IDLING_TIMEOUT_MS) {
logger.debug("FTP server may have timed out operation after " + (elapsed / 1000) + " seconds");
}
logger.debug("Closing input");
input.close();
logger.debug("Input closed. CompletePendingCommand?");
ExecutorService executor = Executors.newCachedThreadPool();
Callable<Object> task = new Callable<Object>() {
public Object call() {
try {
return client.completePendingCommand();
} catch (IOException ioe) {
logger.error("CompletePendingCommand failed with "+ ioe.getMessage());
logger.warn("CompletePendingCommand did not complete "+ ioe.getMessage());
return false;
}
}
......@@ -72,17 +68,17 @@ public class FtpInputStream extends InputStream {
try {
if (elapsed>ASSUMED_IDLING_TIMEOUT_MS) {
logger.info("Attempting completePendingCommand with a " + COMPLETE_PENDING_COMMAND_TIMEOUT_MINUTES + " minute timeout");
logger.info("Calling CompletePendingCommand but FTP server may have timed out operation after " + (elapsed / 1000) + " seconds");
}
Boolean result = (Boolean) future.get(COMPLETE_PENDING_COMMAND_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!result) {
throw new IOException("CompletePendingCommand failed");
throw new IOException("CompletePendingCommand timed out");
}
} catch (TimeoutException ex) {
if (elapsed>ASSUMED_IDLING_TIMEOUT_MS) {
logger.info("CompletePendingCommand did not return in " + COMPLETE_PENDING_COMMAND_TIMEOUT_MINUTES + " minutes. Cancelled, disconnecting.");
} else {
logger.error("CompletePendingCommand did not return in " + COMPLETE_PENDING_COMMAND_TIMEOUT_MINUTES + " minutes. Cancelled, disconnecting.");
logger.warn("CompletePendingCommand did not return in " + COMPLETE_PENDING_COMMAND_TIMEOUT_MINUTES + " minutes. Cancelled, disconnecting.");
}
client.disconnect();
} catch (InterruptedException e) {
......
......@@ -3,6 +3,7 @@ package com.indexdata.masterkey.localindices.client;
import static com.indexdata.utils.TextUtils.joinPath;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
......@@ -14,6 +15,7 @@ import java.net.URL;
import java.net.URLConnection;
import java.text.ParseException;
import java.util.Date;
import java.util.Locale;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipInputStream;
......@@ -21,6 +23,7 @@ import javax.xml.transform.dom.DOMResult;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.net.ftp.FTPConnectionClosedException;
import org.apache.log4j.Level;
import org.marc4j.*;
import org.xml.sax.SAXException;
......@@ -85,18 +88,27 @@ public class XmlMarcClient extends AbstractHarvestClient {
}
}
while (iterator.hasNext()) {
logger.debug("Getting next file or directory from " + file.getName());
count += download(iterator.getNext());
logger.debug("Got " + count + " files from directory structure");
}
} else {
logger.info("Begin processing of "+file.getName());
try {
storeAny(file, proposeCachePath());
logger.debug("Storing " + file.getName() + " done");
logger.debug("Done storing " + file.getName());
} catch (IOException ioe) {
logger.error("Failure storing " + file.getName() + ": " + ioe.getMessage() +
(getResource().getAllowErrors() ? ". Continuing since job is set to proceed on errors." : ""));
if (!getResource().getAllowErrors()) {
throw ioe;
if (ioe.getMessage() != null && ioe.getMessage().toLowerCase( Locale.ROOT ).contains("completependingcommand")) {
logger.warn("Problem with CompletingPendingCommand after download of " + file.getName() + ": " + ioe.getMessage());
logger.debug("Done storing " + file.getName());
} else {
logger.error(
"Failure storing " + file.getName() + ": " + ioe.getMessage() + ( getResource().getAllowErrors() ? ". Continuing since job is set to proceed on errors." : "" ) );
if ( !getResource().getAllowErrors() )
{
throw ioe;
}
}
}
count++;
......@@ -228,30 +240,22 @@ public class XmlMarcClient extends AbstractHarvestClient {
return 0;
}
private void storeAny(RemoteFile file, String cacheFile) throws IOException {
storeAny(file, cacheFile, true);
}
private void storeAny(RemoteFile file, String cacheFile, boolean shouldBuffer) throws IOException {
private void storeAny(RemoteFile file, String cacheFile) throws IOException {
InputStream input;
try {
input = shouldBuffer
? new BufferedInputStream(file.getInputStream())
: file.getInputStream();
logger.info("File: " + file.getName() + ". Length: " + file.getLength());
input = new BufferedInputStream(file.getInputStream());
} catch (IOException ioe) {
if (ioe.getMessage().equals("Connection is not open")) {
logger.error("Connection is not open, may have timed out during download of previous file, attempting to reconnect to download " + file.getName());
((FtpClientTransport)clientTransport).reconnect(10000);
logger.info("Continue processing of " + file.getName());
input = shouldBuffer
? new BufferedInputStream(file.getInputStream())
: file.getInputStream();
input = new BufferedInputStream(file.getInputStream());
} else {
throw ioe;
}
}
MimeTypeCharSet mimeType = deduceMimeType(input, file.getName(), file.getContentType());
EntryFilter excludefilter = new EntryFilterExcludePattern(((XmlBulkResource) resource).getExcludeFilePattern(),logger);
EntryFilter includefilter = new EntryFilterIncludePattern(((XmlBulkResource) resource).getIncludeFilePattern(),logger);
EntryFilter entryFilter = new CompositeEntryFilter(excludefilter,includefilter);
......@@ -293,7 +297,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
int count = 0;
while (it.hasNext()) {
RemoteFile rf = it.getNext();
logger.info("Found harvestable file: "+rf.getName());
logger.info("Found harvestable file in TAR archive: "+rf.getName());
storeAny(rf, proposeCachePath());
count++;
}
......@@ -328,7 +332,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
}
try {
if (mimeType.isMimeType("application/marc")) {
logger.debug("Setting up Binary MARC reader (" + mimeType + ")");
logger.debug("Setting up binary MARC reader (mime type " + mimeType + ", encoding: [" + mimeType.getCharset() +"])");
storeMarc(input, mimeType.getCharset());
} else if (mimeType.isXML()) {
logger.debug("XmlMarcClient setting up XML reader (" + mimeType + ")");
......@@ -348,7 +352,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
// FTPConnectionClosedException will be thrown here.
// This exception is used when deciding whether to attempt a reconnect.
// See download(URL url)
logger.info("Done reading " + file.getName() + ". In finally block of XmlMarcClient.storeAny(), closing input");
logger.info("Done reading " + file.getName() + ". Closing input.");
input.close();
logger.debug("StoreAny closed input stream");
}
......@@ -500,7 +504,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
private void storeMarc(InputStream input, String encoding) throws
IOException {
MarcPermissiveStreamReader reader = new MarcPermissiveStreamReader(input,true, true);
MarcReader reader = new MarcStreamReader( input, encoding );
boolean isTurboMarc = false;
//check what MARC output we want
MimeTypeCharSet mimeType =
......@@ -516,43 +520,54 @@ public class XmlMarcClient extends AbstractHarvestClient {
RecordStorage storage = job.getStorage();
logger.debug("Storage is " + storage.getClass().getName());
ByteArrayOutputStream baos = null;
ByteArrayOutputStream singleRecordByteStream = null;
MarcStreamWriter iso2709writer = null;
if (resource.isStoreOriginal()) {
logger.debug("Initializing writer for original iso2709 marc file");
baos = new ByteArrayOutputStream(20000);
iso2709writer = new MarcStreamWriter(baos);
}
singleRecordByteStream = new ByteArrayOutputStream(20000);
iso2709writer = new MarcStreamWriter(singleRecordByteStream);
while (reader.hasNext()) {
try {
logger.debug("Reading next marc record in file");
org.marc4j.marc.Record record = reader.next();
// Reset single record stream before each record is written
singleRecordByteStream.reset();
// Write the single record to stream.
iso2709writer.write(record);
// Read the single record with permissive, UTF-8 converting reader.
byte[] singleRecord = singleRecordByteStream.toByteArray();
ByteArrayInputStream recordAsInputStream = new ByteArrayInputStream( singleRecord );
MarcReader convertingReader = new MarcPermissiveStreamReader( recordAsInputStream, true, true );
// Note: Permissive Reader cannot be used for the outer iteration instead of MarcStreamReader since it will
// stop reading certain MARC files mid-file and without a message. This is NOT a problem when using
// MarcPermissiveStreamReader on a regular InputStream but it _is_ a problem with the home-grown
// FtpInputStream that must be used for other reasons (the process for other file types breaks without it).
// Thus the two-step reading of binary MARC streams.
//
// Note: No other good solution has been identified for converting incoming binary MARC to UTF-8 XML with
// this version of Marc4J.
//
// Note: This nine year old version of Marc4J must be used because it has support from TurboMarc.
org.marc4j.marc.Record convertedRecord = convertingReader.next();
// Write to XML
DOMResult result = new DOMResult();
if (isTurboMarc) {
writer = new TurboMarcXmlWriter(result);
} else {
writer = new MarcXmlWriter(result);
}
writer.write(record);
if (baos != null && iso2709writer != null) {
baos.reset();
iso2709writer.write(record);
logger.debug("Wrote MARC record to buffer: " + record.toString());
}
writer.write(convertedRecord);
writer.close();
// Store or delete
if (record.getLeader().getRecordStatus() == 'd') {
logger.debug("Removing record from storage");
logger.debug("XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
logger.log(Level.TRACE,"XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
Node node = result.getNode();
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null,
node, baos != null ? baos.toByteArray() : null);
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null, node, singleRecord );
storage.delete(rdi);
} else {
logger.debug("Adding new Record to storage");
logger.debug("XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
logger.log( Level.TRACE, "Adding new Record to storage");
logger.log( Level.TRACE, "XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
Node node = result.getNode();
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null,
node, baos != null ? baos.toByteArray() : null);
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null, node, singleRecord);
storage.add(rdi);
}
if (job.isKillSent()) {
......@@ -570,7 +585,14 @@ public class XmlMarcClient extends AbstractHarvestClient {
logger.error("Received EOF when reading record # " + index);
throw e;
}
break;
if (getResource().getAllowErrors()) {
continue;
} else {
break;
}
} catch (RuntimeException re) {
logger.error("XmlMarcClient, storeMarc(), Runtime exception: " + re.getMessage());
throw(re);
}
if ((++index) % 1000 == 0) {
logger.info("MARC records read: " + index);
......@@ -578,6 +600,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
}
if (iso2709writer != null) iso2709writer.close();
logger.info("MARC records read: " + index);
}
private void storeXml(InputStream is) throws IOException {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment