Commit ddb873ec authored by Niels Erik G. Nielsen's avatar Niels Erik G. Nielsen
Browse files

Issues with binary MARC and large ftp jobs to FOLIO Inventory

  - properly reconnect after ftp connection time outs between large files
  - XmlMarcClient sends deletes by record instead of by ID to storage
  - XmlMarcClient detects deletes by LEADER 05 (status) instead of 06 (type)
  - stop printing each binary MARC file to system.out
  - remove un/pw from logged ftp URLs

  Also fixes name of holdings records metrics (should be HOLDING_RECORD now)
parent 672baf5c
......@@ -42,13 +42,14 @@ public class FtpClientTransport implements ClientTransport {
}
connect(ftpUrl);
login(ftpUrl.getUserInfo());
logger.info("Reconnected to " + ftpUrl.getHost());
}
@Override
public void connect(URL ftpUrl) throws IOException {
if (client == null) {
client = new FTPClient();
this.ftpUrl = ftpUrl;
this.ftpUrl = ftpUrl;
}
String host = ftpUrl.getHost();
int port = (ftpUrl.getPort() != -1 ? ftpUrl.getPort() : ftpUrl.getDefaultPort());
......@@ -60,8 +61,8 @@ public class FtpClientTransport implements ClientTransport {
if (client.isConnected()) {
logger.debug("Client is connected to " + host + serverReply);
} else {
logger.error("Error connecting to " + ftpUrl.toString() + serverReply);
throw new IOException("Error connecting to " + ftpUrl.toString() + serverReply);
logger.error("Error connecting to " + ftpUrl.getHost() + serverReply);
throw new IOException("Error connecting to " + ftpUrl.getHost() + serverReply);
}
if (usePassive) {
client.enterLocalPassiveMode();
......@@ -138,7 +139,7 @@ public class FtpClientTransport implements ClientTransport {
} else {
logger.debug("Found " + files.length + " file(s) at " + path);
}
logger.debug("Creating new FTPRemoteFileIterator with url " + url.toString());
logger.debug("Creating new FTPRemoteFileIterator with url " + url.getHost());
return new FtpRemoteFileIterator(this, url, files, fileFilter, logger);
}
......
......@@ -55,6 +55,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
private String errors = errorText;
private final Date lastFrom;
private final static int defaultSplitAt = 0;
private ClientTransport clientTransport;
public XmlMarcClient(XmlBulkResource resource, BulkRecordHarvestJob job,
Proxy proxy, StorageJobLogger logger, DiskCache dc, Date lastRequested) {
......@@ -75,15 +76,34 @@ public class XmlMarcClient extends AbstractHarvestClient {
public int download(RemoteFile file) throws IOException {
int count = 0;
if (file.isDirectory()) {
RemoteFileIterator iterator = file.getIterator();
RemoteFileIterator iterator;
try {
iterator = file.getIterator();
} catch (IOException ioe) {
if (ioe.getMessage().equals("Connection is not open")) {
logger.error("Connection is not open, may have timed out during download of previous file; reconnecting to recurse into " + file.getName());
((FtpClientTransport)clientTransport).reconnect(10000);
iterator = file.getIterator();
} else {
logger.error("Failed to recurse into " + file.getName() +": " + ioe.getMessage());
throw ioe;
}
}
while (iterator.hasNext()) {
count += download(iterator.getNext());
}
} else {
logger.info("Found harvestable file: "+file.getName());
logger.debug("Invoking storeAny");
storeAny(file, proposeCachePath());
logger.debug("storeAny finished");
logger.info("Begin processing of "+file.getName());
try {
storeAny(file, proposeCachePath());
logger.debug("Storing " + file.getName() + " done");
} catch (IOException ioe) {
logger.error("Failure storing " + file.getName() + ": " + ioe.getMessage() +
(getResource().getAllowErrors() ? ". Continuing since job is set to proceed on errors." : ""));
if (!getResource().getAllowErrors()) {
throw ioe;
}
}
count++;
}
return count;
......@@ -99,9 +119,9 @@ public class XmlMarcClient extends AbstractHarvestClient {
public int download(URL url) throws Exception {
ClientTransportFactory factory = new ResourceClientTransportFactory(
(XmlBulkResource) resource, logger);
logger.info("Preparing retrieval of " + url);
logger.info("Preparing retrieval from " + url.getHost());
try {
ClientTransport clientTransport = factory.lookup(url);
clientTransport = factory.lookup(url);
clientTransport.setFromDate(lastFrom);
clientTransport.setIncludeFilePattern(getResource().getIncludeFilePattern());
clientTransport.setExcludeFilePattern(getResource().getExcludeFilePattern());
......@@ -120,52 +140,45 @@ public class XmlMarcClient extends AbstractHarvestClient {
logger.info("Found subfolder '"+rf.getName()+"' but recursion is off, ignoring.");
}
} else {
logger.debug("XmlMarcClient downloading " + rf.getAbsoluteName());
logger.info("XmlMarcClient downloading " + rf.getAbsoluteName());
download(rf);
logger.debug("XmlMarcClient done downloading");
}
} catch (FTPConnectionClosedException fcce) {
logger.debug("XmlMarcClient caught FTPConnectionClosedException");
logger.error("XmlMarcClient caught FTPConnectionClosedException");
if (getResource().getAllowErrors() && !job.isKillSent()) {
retryFtpDownload(clientTransport, rf, fcce);
} else {
throw fcce;
}
} catch (SocketException se) {
logger.debug("XmlMarcClient caught SocketException");
logger.error("XmlMarcClient caught SocketException");
if (getResource().getAllowErrors() && !job.isKillSent() && clientTransport instanceof FtpClientTransport) {
retryFtpDownload(clientTransport, rf, se);
} else {
throw se;
}
} catch (StopException stoppedAtLimit) {
logger.debug("Stop Exception: " + stoppedAtLimit.getMessage());
logger.info("Stop Exception: " + stoppedAtLimit.getMessage());
throw stoppedAtLimit;
} catch (IOException ioe) {
if (ioe.getMessage().equals("Connection is not open")) {
// Could happen if most recent file was large enough for the server to hit some timeout
// (typically after 900 ms of idling)
retryFtpDownload(clientTransport, rf, ioe);
if (job.isKillSent()) {
logger.info("Detected kill sent during download exception handling. Exception was " + ioe.getMessage());
throw ioe;
} else if (getResource().getAllowErrors()) {
logger.error("Problem occurred during download/store [" + ioe.getMessage() + ioe.getCause() + "] but job set to continue on errors");
logger.warn(errorText + rf.getAbsoluteName() + ". Error: " + ioe.getMessage());
setErrors(getErrors() + (rf.getAbsoluteName() + " "));
} else {
if (job.isKillSent()) {
throw ioe;
}
logger.info("Problem occurred during download/store: " + ioe.getMessage() + ioe);
logger.info("Cause: " + ioe.getCause());
if (getResource().getAllowErrors()) {
logger.warn(errorText + rf.getAbsoluteName() + ". Error: " + ioe.getMessage());
logger.debug("Cause", ioe);
setErrors(getErrors() + (rf.getAbsoluteName() + " "));
} else {
throw ioe;
}
logger.error("Problem occurred during download/store of " + rf.getName() + " [" + ioe.getMessage() + ioe.getCause() + "] and job NOT set to continue on errors");
throw ioe;
}
} catch (Exception e) {
logger.debug("XmlMarcClient caught Exception " + (job.isKillSent()? " job killed(1)." : ""));
logger.debug("XmlMarcClient caught Exception processing " + rf.getName() + (job.isKillSent()? ": job killed(1)." : ""));
if (job.isKillSent()) {
throw e;
}
logger.info("Problem occurred during download/store: " + e.getMessage() + e);
logger.info("Problem occurred during download/store of " + rf.getName() + ": " + e.getMessage() + e);
logger.info("Cause: " + e.getCause());
if (getResource().getAllowErrors()) {
logger.warn(errorText + rf.getAbsoluteName() + ". Error: " + e.getMessage());
......@@ -175,36 +188,35 @@ public class XmlMarcClient extends AbstractHarvestClient {
throw e;
}
}
logger.debug("XmlMarcClient done iterating file list");
logger.debug("XmlMarcClient done iterating file list at " + url.getHost());
}
} else {
getJob().setStatus(HarvestStatus.OK, "Found no files at "+url+ (getResource().getAllowCondReq() ? ", possibly due to filtering. " : ""));
}
} catch (ClientTransportError cte) {
logger.debug("XmlMarcClient caught ClientTransportError");
if (getResource().getAllowErrors()) {
setErrors("ClientTransportError, " + getErrors() + (url.toString() + " "));
setErrors("ClientTransportError [" + getErrors() + "] retrieving from " + url.getHost() + " but job set to continue on errors");
return 1;
} else {
logger.debug("XmlMarcClient caught ClientTransportError while retrieving from " + url.getHost() + " and job NOT set to continue on errors");
throw cte;
}
}
logger.debug("XmlMarcClient sleeping for 2 seconds");
// TODO HACK HACK HACK
Thread.sleep(2000);
logger.info("Finished retrieval of " + url.toString());
logger.info("Finished retrieval from " + url.getHost());
} catch (StopException ex) {
logger.info("Stop requested. Reason: " + ex.getMessage());
return 0;
} catch (Exception ex) {
logger.debug("XmlMarcClient caught exception" + (job.isKillSent() ? ": job killed(2)" : ""));
logger.debug("XmlMarcClient caught exception" + (job.isKillSent() ? ": job killed(2)" : "") + " when retrieving from " + url.getHost());
if (job.isKillSent()) {
throw ex;
}
if (getResource().getAllowErrors()) {
logger.warn(errorText + url.toString() + ". Error: " + ex.getMessage());
logger.debug("Cause", ex);
setErrors(getErrors() + (url.toString() + " "));
logger.warn(errorText + url.getHost() + ". Error: " + ex.getMessage() + " " + ex.getCause().toString());
setErrors(getErrors() + (url.getHost() + " "));
return 1;
}
throw ex;
......@@ -225,13 +237,24 @@ public class XmlMarcClient extends AbstractHarvestClient {
storeAny(file, cacheFile, true);
}
private void storeAny(RemoteFile file, String cacheFile, boolean shouldBuffer) throws
EOFException, FTPConnectionClosedException, IOException {
//buffer reads
InputStream input = shouldBuffer
? new BufferedInputStream(file.getInputStream())
: file.getInputStream();
private void storeAny(RemoteFile file, String cacheFile, boolean shouldBuffer) throws IOException {
InputStream input;
try {
input = shouldBuffer
? new BufferedInputStream(file.getInputStream())
: file.getInputStream();
} catch (IOException ioe) {
if (ioe.getMessage().equals("Connection is not open")) {
logger.error("Connection is not open, may have timed out during download of previous file, attempting to reconnect to download " + file.getName());
((FtpClientTransport)clientTransport).reconnect(10000);
logger.info("Continue processing of " + file.getName());
input = shouldBuffer
? new BufferedInputStream(file.getInputStream())
: file.getInputStream();
} else {
throw ioe;
}
}
MimeTypeCharSet mimeType = deduceMimeType(input, file.getName(), file.getContentType());
EntryFilter excludefilter = new EntryFilterExcludePattern(((XmlBulkResource) resource).getExcludeFilePattern(),logger);
......@@ -330,9 +353,9 @@ public class XmlMarcClient extends AbstractHarvestClient {
// FTPConnectionClosedException will be thrown here.
// This exception is used when deciding whether to attempt a reconnect.
// See download(URL url)
logger.debug("In XmlMarcClient, storeAny, finally block. Attempting to close input");
logger.info("Done reading " + file.getName() + ". In finally block of XmlMarcClient.storeAny(), closing input");
input.close();
logger.debug("StoreAny: Input stream closed.");
logger.debug("StoreAny closed input stream");
}
}
......@@ -524,16 +547,19 @@ public class XmlMarcClient extends AbstractHarvestClient {
logger.debug("Wrote MARC record to buffer: " + record.toString());
}
writer.close();
if (record.getLeader().getTypeOfRecord() == 'd') {
storage.delete(record.getControlNumber());
if (record.getLeader().getRecordStatus() == 'd') {
logger.debug("Removing record from storage");
logger.debug("XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
Node node = result.getNode();
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null,
node, baos != null ? baos.toByteArray() : null);
storage.delete(rdi);
} else {
logger.debug("Adding new Record to storage");
logger.debug("XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
Node node = result.getNode();
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null,
node, baos != null ? baos.toByteArray() : null);
//logger.debug("Value of isCollection is " + rdi.isCollection());
//logger.debug("Value of subRecords is " + rdi.getSubRecords());
storage.add(rdi);
}
if (job.isKillSent()) {
......@@ -548,7 +574,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
logger.error("Cause: " + e.getCause().getMessage(), e.getCause());
}
if (e.getCause() instanceof EOFException) {
logger.warn("Received EOF when reading record # " + index);
logger.error("Received EOF when reading record # " + index);
throw e;
}
break;
......@@ -593,7 +619,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
*/
private void retryFtpDownload(ClientTransport clientTransport, RemoteFile rf, IOException ex) throws ClientTransportError {
logger.warn(errorText + rf.getAbsoluteName() + ". Error: " + ex.getMessage());
logger.debug("Cause", ex);
logger.debug("Cause" + ex.getCause() + ex.getMessage());
setErrors(getErrors() + (rf.getAbsoluteName() + " "));
logger.info("Connection lost. Attempting reconnect in 10 seconds.");
try {
......@@ -608,11 +634,12 @@ public class XmlMarcClient extends AbstractHarvestClient {
} else {
throw new ClientTransportError("Attempt to reconnect and download failed: " + ioe.getMessage());
}
} else {
logger.error("retryFtpDownload detected killSent - canceling retry");
}
}
}
private void storeCSV(InputStream input, MimeTypeCharSet mt) throws IOException {
MessageConsumer mc = new RecordStorageConsumer(job.getStorage(), job.getLogger(), resource.isStoreOriginal());
try {
......
......@@ -101,10 +101,27 @@ public class TransformationRecordStorageProxy extends AbstractTransformationReco
@Override
public void delete(String id) {
RecordDOMImpl recordDOM = new RecordDOMImpl(id, null, null, null);
while (true) {
if (job.isKillSent())
throw new RuntimeException("Job killed");
getTarget().delete(id);
throw new RuntimeException("Job killed");
try {
Record transformed = transformNode(recordDOM);
if (transformed != null)
getTarget().delete(transformed.getId());
else {
logger.warn("Record filtered out. " + recordDOM);
}
break;
} catch (InterruptedException e) {
e.printStackTrace();
try {
errors.put(e);
} catch (InterruptedException e1) {
logger.error("Record not added to error. " + recordDOM);
e1.printStackTrace();
}
}
}
}
......
......@@ -92,6 +92,7 @@ import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
inventoryRecordSet.put("instance", transformedRecord.getInstance());
inventoryRecordSet.put("holdingsRecords", transformedRecord.getHoldings());
JSONObject responseJson = upsertInventoryRecordSet(inventoryRecordSet);
logger.log(Level.TRACE, "Response was: " + responseJson.toJSONString());
UpsertMetrics metrics = new UpsertMetrics((JSONObject)responseJson.get("metrics"));
if (ctxt.harvestable.isStoreOriginal()) {
......@@ -217,7 +218,7 @@ import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
json = upsertMetricsJson;
}
instance = new EntityMetrics((JSONObject) json.get("INSTANCE"));
holdingsRecord = new EntityMetrics((JSONObject) json.get("HOLDINGSRECORD"));
holdingsRecord = new EntityMetrics((JSONObject) json.get("HOLDINGS_RECORD"));
item = new EntityMetrics((JSONObject) json.get("ITEM"));
}
......@@ -1026,7 +1027,7 @@ import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
logger.log(Level.TRACE, "Delete request received: " + transformedRecord.getDelete().toJSONString());
JSONObject deletionJson = transformedRecord.getDelete();
if (ctxt.useInventoryUpsert) {
logger.info("Sending delete request to " + ctxt.inventoryUpsertUrl);
logger.info("Sending delete request " + transformedRecord.getJson().toJSONString() + " to " + ctxt.inventoryUpsertUrl);
//HttpEntityEnclosingRequestBase httpDelete = new HttpEntityEnclosingRequestBase(ctxt.inventoryUpsertUrl);
HttpDeleteWithBody httpDelete = new HttpDeleteWithBody(ctxt.inventoryUpsertUrl);
setHeaders(httpDelete,"application/json");
......
......@@ -245,7 +245,6 @@ public class InventoryStorageController implements RecordStorage {
String holdingsRecordsMessage = "Holdings_records_processed/loaded/deleted/failed:__" + ctxt.updateCounters.holdingsRecordsProcessed + "___" + ctxt.updateCounters.holdingsRecordsLoaded + "___" + ctxt.updateCounters.holdingsRecordsDeleted + "___" + ctxt.updateCounters.holdingsRecordsFailed + "_";
String itemsMessage = "Items_processed/loaded/deleted/failed:__" + ctxt.updateCounters.itemsProcessed + "___" + ctxt.updateCounters.itemsLoaded + "___" + ctxt.updateCounters.itemsDeleted + "___" + ctxt.updateCounters.itemsFailed + "_";
String sourceRecordsMessage = "Source_records_processed/loaded/deleted/failed:__" + ctxt.updateCounters.sourceRecordsProcessed + "___" + ctxt.updateCounters.sourceRecordsLoaded + "___" + ctxt.updateCounters.sourceRecordsDeleted + "___" + ctxt.updateCounters.sourceRecordsFailed + "_";
logger.log((ctxt.updateCounters.instancesFailed>0 ? Level.WARN : Level.INFO), instancesMessage);
logger.log((ctxt.updateCounters.holdingsRecordsFailed>0 ? Level.WARN : Level.INFO), holdingsRecordsMessage);
logger.log((ctxt.updateCounters.itemsFailed>0 ? Level.WARN : Level.INFO), itemsMessage);
......@@ -320,7 +319,7 @@ public class InventoryStorageController implements RecordStorage {
@Override
public void delete(String id) {
throw new UnsupportedOperationException("delete by id not supported.");
logger.error("Delete by ID [" + id + "] not supported for Inventory");
}
@Override
......
......@@ -33,8 +33,6 @@ public class MarcToJson {
MarcWriter writer = new MarcXmlWriter(baos);
writer.write(record);
writer.close();
System.out.print(baos.toString("UTF-8"));
System.out.flush();
JSONObject json = MarcXMLToJson.convertMarcXMLToJson(baos.toString("UTF-8"));
jsonList.add(json);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment