When running Camel in ServicemMix on linux, the only reliable readLock the we use is "changed", but if the writer is writing very slowly, the defaut time 1s between checks is not sufficient. In the end, we have to implement a org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy to provide to exclusiveReadLockStrategy, This class is basically the same as FileChangedExclusiveReadLockStrategy but with a longer sleep time.

I have made a patch to provide and option for user to specify the time between checks

the patch is at: https://issues.apache.org/jira/browse/CAMEL-3520 but it's only available in camel 2.6.

For older camel versions, we have to provide a  exclusiveReadLockStrategy as followed:

 

[code]


import java.io.File;
import java.io.IOException;

import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.component.file.strategy.FileChangedExclusiveReadLockStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class ChangedFileExclusiveReadLockStrategy extends FileChangedExclusiveReadLockStrategy {
   

       private static final transient Log LOG = LogFactory.getLog(ChangedFileExclusiveReadLockStrategy.class);
        private long timeout;

        @Override
        public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
            // noop
        }

        public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
            File target = new File(file.getAbsoluteFilePath());
            boolean exclusive = false;

            if (LOG.isTraceEnabled()) {
                LOG.trace("Waiting for exclusive read lock to file: " + file);
            }

            try {
                long lastModified = Long.MIN_VALUE;
                long length = Long.MIN_VALUE;
                long start = System.currentTimeMillis();
                //StopWatch watch = new StopWatch();

                while (!exclusive) {
                    // timeout check
                    if (timeout > 0) {
                         long delta = System.currentTimeMillis() - start;
                        if (delta > timeout) {
                            LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
                            // we could not get the lock within the timeout period, so return false
                            return false;
                        }
                    }

                    long newLastModified = target.lastModified();
                    long newLength = target.length();

                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
                        LOG.trace("Previous length: " + length + ", new length: " + newLength);
                    }

                    if (newLastModified == lastModified && newLength == length) {
                        // let super handle the last part of acquiring the lock now the file is not
                        // currently being in progress of being copied as file length and modified
                        // are stable
                        exclusive = super.acquireExclusiveReadLock(operations, file, exchange);
                    } else {
                        // set new base file change information
                        lastModified = newLastModified;
                        length = newLength;

                        boolean interrupted = sleep();
                        if (interrupted) {
                            // we were interrupted while sleeping, we are likely being shutdown so return false
                            return false;
                        }
                    }
                }
            } catch (IOException e) {
                // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
                // such as AntiVirus or MS Office that has special locks for it's supported files
                if (timeout == 0) {
                    // if not using timeout, then we cant retry, so rethrow
                    throw e;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Cannot acquire read lock. Will try again.", e);
                }
                boolean interrupted = sleep();
                if (interrupted) {
                    // we were interrupted while sleeping, we are likely being shutdown so return false
                    return false;
                }
            }

            return exclusive;
        }

        private boolean sleep() {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Exclusive read lock not granted. Sleeping for 10000 millis.");
            }
            try {
                Thread.sleep(10000);
                return false;
            } catch (InterruptedException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
                }
                return true;
            }
        }

        public long getTimeout() {
            return timeout;
        }

        public void setTimeout(long timeout) {
            this.timeout = timeout;
        }
}

[/code]

 

Go to top
JSN Boot template designed by JoomlaShine.com