Unverified Commit 17eac641 authored by Niels Erik's avatar Niels Erik Committed by GitHub
Browse files

Merge pull request #58 from indexdata/marcftp

Marcftp
parents ba9f684b db0533be
FROM jetty
FROM jetty:9.3-jre8
USER root
RUN mkdir -p /etc/masterkey/harvester
......
FROM jetty
FROM jetty:9.3-jre8
USER root
RUN mkdir -p /etc/masterkey/harvester
......
......@@ -138,6 +138,7 @@ public class FtpClientTransport implements ClientTransport {
} else {
logger.debug("Found " + files.length + " file(s) at " + path);
}
logger.debug("Creating new FTPRemoteFileIterator with url " + url.toString());
return new FtpRemoteFileIterator(this, url, files, fileFilter, logger);
}
......
......@@ -24,11 +24,24 @@ public class FtpRemoteFile extends RemoteFile {
this.logger = logger;
this.transport = transport;
//FTP needs root relative paths!
if (getAbsoluteName().startsWith("/")) {
rootRelPath = getAbsoluteName().substring(1);
} else {
rootRelPath = getAbsoluteName();
logger.debug("getAbsoluteName() returns " + getAbsoluteName() );
String parentPath = parent.getPath();
//if (getAbsoluteName().startsWith("/")) {
String pathCandidate = pathJoin(parentPath, basename(file.getName()));
if(pathCandidate.startsWith("/")) {
pathCandidate = pathCandidate.substring(1);
}
rootRelPath = pathCandidate;
}
private String pathJoin(String parent, String child) {
if(parent.endsWith("/")) {
parent = parent.substring(0, parent.length() - 1);
}
if(child.startsWith("/")) {
child = child.substring(1);
}
return parent + "/" + child;
}
@Override
......@@ -40,6 +53,7 @@ public class FtpRemoteFile extends RemoteFile {
public synchronized InputStream getInputStream() throws FTPConnectionClosedException, IOException {
FTPClient client = ((FtpClientTransport)transport).getClient();
client.setFileType(FTP.BINARY_FILE_TYPE);
logger.debug("Getting input stream for rootRelPath " + rootRelPath);
InputStream data = client.retrieveFileStream(rootRelPath);
if (data == null) {
String reply = client.getReplyString();
......
......@@ -45,8 +45,10 @@ import com.indexdata.masterkey.localindices.harvest.job.StorageJobLogger;
import com.indexdata.masterkey.localindices.harvest.storage.RecordDOMImpl;
import com.indexdata.masterkey.localindices.harvest.storage.RecordStorage;
import com.indexdata.masterkey.localindices.harvest.storage.XmlSplitter;
import com.indexdata.masterkey.localindices.util.TextUtils;
import com.indexdata.xml.filter.MessageConsumer;
import com.indexdata.xml.filter.SplitContentHandler;
import org.w3c.dom.Node;
public class XmlMarcClient extends AbstractHarvestClient {
private String errorText = "Failed to download/parse/store : ";
......@@ -116,7 +118,8 @@ public class XmlMarcClient extends AbstractHarvestClient {
logger.info("Found subfolder '"+rf.getName()+"' but recursion is off, ignoring.");
}
} else {
download(rf);
logger.debug("Downloading " + rf.getAbsoluteName());
download(rf);
}
} catch (FTPConnectionClosedException fcce) {
if (getResource().getAllowErrors() && !job.isKillSent()) {
......@@ -203,6 +206,7 @@ public class XmlMarcClient extends AbstractHarvestClient {
//TODO RemoteFile abstraction is not good enough to make this clean
//some transports may deal with compressed files (e.g http) others may not
//if we end up with a compressed mimetype we need to decompress
logger.debug("mimeType is " + mimeType.getMimeType());
if (mimeType.isZip()) {
logger.debug("Transport returned ZIP compressed file, expanding..");
ZipInputStream zis = new ZipInputStream(input);
......@@ -455,15 +459,18 @@ public class XmlMarcClient extends AbstractHarvestClient {
long index = 0;
MarcWriter writer;
RecordStorage storage = job.getStorage();
logger.debug("Storage is " + storage.getClass().getName());
ByteArrayOutputStream baos = null;
MarcStreamWriter iso2709writer = null;
if (resource.isStoreOriginal()) {
logger.debug("Initializing writer for original iso2709 marc file");
baos = new ByteArrayOutputStream(20000);
iso2709writer = new MarcStreamWriter(baos);
}
while (reader.hasNext()) {
try {
logger.debug("Reading next marc record in file");
org.marc4j.marc.Record record = reader.next();
DOMResult result = new DOMResult();
if (isTurboMarc) {
......@@ -475,13 +482,20 @@ public class XmlMarcClient extends AbstractHarvestClient {
if (baos != null && iso2709writer != null) {
baos.reset();
iso2709writer.write(record);
logger.debug("Wrote MARC record to buffer: " + record.toString());
}
writer.close();
if (record.getLeader().getTypeOfRecord() == 'd') {
storage.delete(record.getControlNumber());
} else {
storage.add(new RecordDOMImpl(record.getControlNumber(), null, result.
getNode(), baos != null ? baos.toByteArray() : null));
logger.debug("Adding new Record to storage");
logger.debug("XML value of Record is " + TextUtils.nodeToXMLString(result.getNode()));
Node node = result.getNode();
RecordDOMImpl rdi = new RecordDOMImpl(record.getControlNumber(), null,
node, baos != null ? baos.toByteArray() : null);
//logger.debug("Value of isCollection is " + rdi.isCollection());
//logger.debug("Value of subRecords is " + rdi.getSubRecords());
storage.add(rdi);
}
if (job.isKillSent()) {
// Close to end the pipe
......
......@@ -109,6 +109,7 @@ public class InstanceXmlToInstanceJsonTransformerRouter implements MessageRouter
private void consume(Object documentIn) {
if (documentIn instanceof Record) {
logger.debug("documentIn is of class " + documentIn.getClass().getName());
Record recordIn = (Record) documentIn;
RecordJSON recordOut = new RecordJSONImpl();
try {
......
package com.indexdata.masterkey.localindices.harvest.messaging;
import com.indexdata.masterkey.localindices.entity.TransformationStep;
import com.indexdata.masterkey.localindices.harvest.job.ErrorMessage;
import com.indexdata.masterkey.localindices.harvest.job.RecordHarvestJob;
import com.indexdata.masterkey.localindices.harvest.job.StorageJobLogger;
import com.indexdata.masterkey.localindices.harvest.storage.Record;
import com.indexdata.masterkey.localindices.harvest.storage.RecordDOM;
import com.indexdata.masterkey.localindices.harvest.storage.RecordDOMImpl;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
public class MarcXMLToOAIXMLTransformerRouter implements MessageRouter<Object> {
private MessageConsumer<Object> input;
private MessageProducer<Object> output;
private MessageProducer<Object> error;
private boolean running = true;
public static final String OAI_NAMESPACE = "http://www.openarchives.org/OAI/2.0/";
RecordHarvestJob job;
StorageJobLogger logger;
TransformationStep step;
public MarcXMLToOAIXMLTransformerRouter(TransformationStep step, RecordHarvestJob job) {
this.job = job;
this.logger = job.getLogger();
if (step instanceof TransformationStep) {
this.step = (TransformationStep)step;
} else {
throw new RuntimeException("Configuration error, 'step' must be of type TransformationStep");
}
}
@Override
public void setInput(MessageConsumer<Object> input) {
this.input = input;
}
@Override
public void setOutput(MessageProducer<Object> output) {
this.output = output;
}
@Override
public void setError(MessageProducer<Object> error) {
this.error = error;
}
@Override
public void shutdown() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void setThread(Thread thred) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void run() {
if(input == null) {
throw new RuntimeException("Input queue is not configured");
}
while(running) {
try {
consume(input.take());
} catch(InterruptedException ie) {
logger.warn("Interrupted while Running: " + running + ": " + ie.getLocalizedMessage());
}
}
}
@Override
public void onMessage(Object object) {
consume(object);
}
@Override
public Object take() throws InterruptedException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
private void consume(Object documentIn) {
if(!(documentIn instanceof Record)) {
return;
}
Record record = (Record)documentIn;
try {
Node recordNode = ((RecordDOM) record).toNode();
if(recordNode.getNodeType() == Node.DOCUMENT_NODE) {
recordNode = ((Document)recordNode).getDocumentElement();
}
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = dbFactory.newDocumentBuilder();
Document document = builder.newDocument();
Element root = document.createElementNS(OAI_NAMESPACE, "record");
document.appendChild(root);
Element metadata = document.createElementNS(OAI_NAMESPACE, "metadata");
root.appendChild(metadata);
Node recordNodeCopy = document.importNode(recordNode, true);
metadata.appendChild(recordNodeCopy);
RecordDOMImpl recordOut = new RecordDOMImpl(record.getId(), null, document,
record.getOriginalContent());
produce(recordOut);
} catch(Exception e) {
logger.error("Error in consume: ", e);
}
}
private void produce(Object documentOut) {
try {
output.put(documentOut);
} catch(InterruptedException ie) {
if(job.isKillSent()) {
return;
}
logger.error("Failed to put Result to output queue: Interrupted", ie);
try {
if(error != null) {
error.put(new ErrorMessage(documentOut, ie));
} else {
logger.error("No error queue, losing mesage: " + documentOut.toString());
}
} catch(InterruptedException ie2) {
logger.error("Failed to put Result on Error queue, losing mesage: "
+ documentOut.toString());
}
} catch(Exception e) {
logger.error("Error putting file on output queue: " + e.getLocalizedMessage());
}
}
}
......@@ -79,15 +79,17 @@ public class XmlTransformerRouter implements MessageRouter {
logger.log(Level.TRACE, "Step " + this.step.getName() + " produced: " + sourceAsString(xmlSource));
DOMResult result = new DOMResult();
try {
transformer.transform(xmlSource, result);
logger.log(Level.TRACE, "Calling transformer of type " + transformer.getClass().getName());
transformer.transform(xmlSource, result);
} catch (Exception e) {
putError(xmlSource, e);
return;
putError(xmlSource, e);
return;
}
if (record instanceof RecordDOM) {
((RecordDOM) record).setNode(result.getNode());
} else
record = new RecordDOMImpl(record.getId(), record.getDatabase(), result.getNode(), record.getOriginalContent());
((RecordDOM) record).setNode(result.getNode());
} else {
record = new RecordDOMImpl(record.getId(), record.getDatabase(), result.getNode(), record.getOriginalContent());
}
produce(record);
}
}
......
......@@ -22,6 +22,7 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import ORG.oclc.oai.harvester2.verb.XPathHelper;
import com.indexdata.masterkey.localindices.util.TextUtils;
public class RecordDOMImpl extends RecordImpl implements RecordDOM {
private String pz2Namespace = "http://www.indexdata.com/pazpar2/1.0";
......@@ -122,6 +123,7 @@ public class RecordDOMImpl extends RecordImpl implements RecordDOM {
@Override
public boolean isCollection() {
if (node != null) {
logger.debug("Checking for collection with node structure " + TextUtils.nodeToXMLString(node));
Element root;
if (node.getNodeType() == Node.ELEMENT_NODE) {
root = (Element) node;
......@@ -130,6 +132,11 @@ public class RecordDOMImpl extends RecordImpl implements RecordDOM {
} else {
root = node.getOwnerDocument().getDocumentElement();
}
if(root == null) {
return false;
}
logger.debug("root tag name is " + root.getTagName());
logger.debug("root local name is " + root.getLocalName());
boolean isCollection = (
root != null
&& (root.getTagName().equals("collection") ||
......@@ -137,19 +144,36 @@ public class RecordDOMImpl extends RecordImpl implements RecordDOM {
);
return isCollection;
} else {
logger.debug("Attempted to check isCollection but node is null");
return false;
}
}
public Collection<Record> getSubRecords() {
if (!isCollection()) return null;
if (!isCollection()) {
logger.debug("Not collection, returning null");
return null;
}
logger.debug("Checking for sub records with node structure " + TextUtils.nodeToXMLString(node));
NodeList children = null;
if(node.getNodeType() == Node.DOCUMENT_NODE) {
logger.debug("Node is a document node");
children = ((Document) node).getDocumentElement().getChildNodes();
} else {
children = node.getChildNodes();
}
logger.debug(children.getLength() + " child nodes found");
/*
NodeList children = node.getNodeType() == Node.DOCUMENT_NODE
? ((Document) node).getDocumentElement().getChildNodes()
: node.getChildNodes();
? ((Document) node).getDocumentElement().getChildNodes()
: node.getChildNodes();
*/
List<Record> list = new ArrayList<Record>(children.getLength());
for (int i=0; i<children.getLength(); i++) {
for (int i = 0; i < children.getLength(); i++) {
Node child = children.item(i);
if (child.getNodeType() != Node.ELEMENT_NODE) continue;
if (child.getNodeType() != Node.ELEMENT_NODE) {
continue;
}
Element childElem = (Element) child;
//original record is set to null since at this point the collection
//is most likely transformed and we can't "extract' matching original rec
......
......@@ -16,6 +16,7 @@ import com.indexdata.masterkey.localindices.harvest.messaging.MessageProducer;
import com.indexdata.masterkey.localindices.harvest.messaging.MessageQueue;
import com.indexdata.masterkey.localindices.harvest.messaging.MessageRouter;
import com.indexdata.masterkey.localindices.harvest.messaging.RouterFactory;
import org.apache.log4j.Level;
public class TransformationRecordStorageProxy extends AbstractTransformationRecordStorageProxy {
private StorageJobLogger logger;
......@@ -42,15 +43,16 @@ public class TransformationRecordStorageProxy extends AbstractTransformationReco
}
protected Record transformNode(Record record) throws InterruptedException {
logger.log(Level.TRACE, "source is of type " + source.getClass().getName());
source.put(record);
if (!result.isEmpty()) {
Object obj = result.take();
count++;
if (obj instanceof Record)
return (Record) obj;
else {
logger.error("Unknown type to add: " + obj.getClass() + " " + obj.toString());
}
Object obj = result.take();
count++;
if (obj instanceof Record) {
return (Record) obj;
} else {
logger.error("Unknown type to add: " + obj.getClass() + " " + obj.toString());
}
}
return null;
}
......@@ -60,13 +62,18 @@ public class TransformationRecordStorageProxy extends AbstractTransformationReco
String msg = "Stop requested after " + limit + " records";
logger.info(msg);
throw new StopException(msg);
} else {
logger.log(Level.TRACE, "limit is " + limit + " and count is " + count);
}
}
@Override
public void add(Record record) {
if (job.isKillSent())
if (job.isKillSent()) {
throw new RuntimeException("Job killed");
}
logger.debug("TransformationRecordStorageProxy adding record of class "
+ record.getClass().getName() + ", target is of class " + getTarget().getClass().getName());
RecordDOMImpl recordDOM = new RecordDOMImpl(record);
while (true) {
try {
......@@ -167,8 +174,10 @@ public class TransformationRecordStorageProxy extends AbstractTransformationReco
for (TransformationStep step : steps) {
MessageRouter<Object> router = factory.create(step);
router.setError(errors);
if (source == null)
if (source == null) {
logger.debug("Initializing source as ConsumerProxy<Object> of type " + router.getClass().getName());
source = new ConsumerProxy<Object>(router);
}
if (previous != null)
previous.setOutput(new ConsumerProxy<Object>(router));
messageRouters[index++] = router;
......
......@@ -29,6 +29,7 @@ import org.xml.sax.SAXException;
import com.indexdata.masterkey.localindices.harvest.job.StorageJobLogger;
import com.indexdata.masterkey.localindices.harvest.storage.RecordJSON;
import com.indexdata.masterkey.localindices.harvest.storage.StorageException;
import com.indexdata.masterkey.localindices.util.MarcToJson;
import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
/**
......@@ -685,6 +686,7 @@ import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
return itemResponse;
}
/**
* Get items for a holdings record
* @param holdingsRecordId
......@@ -762,10 +764,17 @@ import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
JSONObject marcJson = null;
if (record.getOriginalContent() != null) {
try {
logger.log(Level.TRACE,"Incoming original content: " + new String(record.getOriginalContent(), "UTF-8"));
marcJson = MarcXMLToJson.convertMarcXMLToJson(new String(record.getOriginalContent(), "UTF-8"));
logger.log(Level.TRACE, "Original content converted to JSON: " + marcJson.toJSONString());
String originalContentString = new String(record.getOriginalContent(), "UTF-8");
if(originalContentString.startsWith("<") ) {
logger.log(Level.TRACE,"Treating source record as XML");
marcJson = MarcXMLToJson.convertMarcXMLToJson(originalContentString);
} else {
logger.log(Level.TRACE,"Treating source record as ISO2079");
marcJson = MarcToJson.convertMarcRecordsToJson(originalContentString).get(0);
}
logger.debug(marcJson.toJSONString());
} catch (IOException | ParserConfigurationException | SAXException e) {
updateCounters.sourceRecordsFailed++;
RecordError error = new ExceptionRecordError(e, "Error creating MARC JSON for source record", "MARC source");
recordWithErrors.reportAndThrowError(error, Level.DEBUG);
......@@ -995,7 +1004,6 @@ import com.indexdata.masterkey.localindices.util.MarcXMLToJson;
logger.log(Level.TRACE, "Delete request received: " + transformedRecord.getDelete().toJSONString());
JSONObject deletionJson = transformedRecord.getDelete();
if (ctxt.useInventoryUpsert) {
logger.info("TODO: Implement delete signal to Inventory upsert: " + deletionJson.toJSONString());
logger.info("Sending delete request to " + ctxt.inventoryUpsertUrl);
//HttpEntityEnclosingRequestBase httpDelete = new HttpEntityEnclosingRequestBase(ctxt.inventoryUpsertUrl);
HttpDeleteWithBody httpDelete = new HttpDeleteWithBody(ctxt.inventoryUpsertUrl);
......
package com.indexdata.masterkey.localindices.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import javax.xml.parsers.ParserConfigurationException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.marc4j.MarcXmlWriter;
import org.marc4j.MarcReader;
import org.marc4j.MarcStreamReader;
import org.marc4j.MarcWriter;
import org.marc4j.marc.Record;
import org.xml.sax.SAXException;
public class MarcToJson {
public static List<JSONObject> convertMarcRecordsToJson(InputStream inputStream)
throws UnsupportedEncodingException, SAXException, IOException,
ParserConfigurationException {
List<JSONObject> jsonList = new ArrayList<>();
MarcReader reader = new MarcStreamReader(inputStream);
while(reader.hasNext()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Record record = reader.next();
MarcWriter writer = new MarcXmlWriter(baos);
writer.write(record);
writer.close();
System.out.print(baos.toString("UTF-8"));
System.out.flush();
JSONObject json = MarcXMLToJson.convertMarcXMLToJson(baos.toString("UTF-8"));
jsonList.add(json);
}
return jsonList;
}
public static List<JSONObject> convertMarcRecordsToJson(String marcString)
throws UnsupportedEncodingException, SAXException, IOException, ParserConfigurationException {
return convertMarcRecordsToJson(new ByteArrayInputStream(marcString.getBytes()));
}
}
......@@ -21,11 +21,6 @@ import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
/**
*
* @author kurt
*/
public class MarcXMLToJson {
public static JSONObject convertMarcXMLToJson(String marcXML)
......
......@@ -10,6 +10,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.StringWriter;