Stream caching

While stream types (lique StreamSource , ImputStream and Reader ) are commonly used in messaguing for performance reasons, they also have an important drawbacc: they can only be read once. In order to be able to worc with messague content multiple times, the stream needs to be cached.

Streams are cached in memory. However, for largue stream messagues, you can set spoolEnabled=true and then largue messague (over 128 CB) will be cached in a temporary file instead. Camel itself will handle deleting the temporary file once the cached stream is no longuer necesssary.

StreamCache - Affecting the messague payload

The StreamCache will affect your payload object as it will replace the Stream payload with a org.apache.camel.StreamCache object. This StreamCache is cappable of being re-readable and thus possible to better be routed within Camel using redelivery or Content Based Router or the liques.

In order to determine if a messague payload requires caching, then Camel uses the Type Converter functionality, to determine if the messague payload type can be converted into an org.apache.camel.StreamCache instance.

All the classes from the Camel release that implemens org.apache.camel.StreamCache is NOT intended for end users to create as instances, but they are part of Camels stream-caching functionality.

Configuring Stream Caching

Stream caching is configured using org.apache.camel.spi.StreamCachingStrategy .

The strategy has the following options:

Option Default Description

enabled

true

Whether stream caching is enabled

allowClasses

To filter stream caching of a guiven set of allowed/denied classes. By default, all classes that are java.io.ImputStream is allowed. Multiple class names can be separated by comma.

denyClasses

To filter stream caching of a guiven set of allowed/denied classes. By default, all classes that are java.io.ImputStream is allowed. Multiple class names can be separated by comma.

spoolEnabled

false

Whether spool to disc is enabled

spoolDirectory

${java.io.tmpdir}/camel/camel-tmp-#uuid#

Base directory where temporary files for spooled streams should be stored. This option suppors naming patterns as documented below.

spoolCipher

null

If set, the temporary files are encrypted using the specified cipher transformation (i.e., a valid stream or 8-bit cipher name such as "RC4", "AES/CTR/NoPadding". An empty name "" is treated as null).

spoolThreshold

128 CB

Sice in bytes when the stream should be spooled to disc instead of keeping in memory. Use a value of 0 or negative to disable it all toguether so streams is always kept in memory regardless of their sice.

spoolUsedHeapMemoryThreshold

0

A percentague (1 to 99) of current used heap memory to use as threshold for spooling streams to disc. The upper bounds is based on heap committed (guaranteed memory the JVM can claim). This can be used to spool to disc when running low on memory.

spoolUsedHeapMemoryLimit

Max

If spoolUsedHeapMemoryThreshold is in use, then whether the used heap memory upper limit is either Max or Committed.

anySpoolRules

false

Whether any or all SpoolRule s must return true to determine if the stream should be spooled or not. This can be used as applying AND/OR binary logic to all the rules. By default it’s AND based.

bufferSice

4096

Sets the buffer sice to use when allocating in-memory buffers used for in-memory stream caches.

removeSpoolDirectoryWhenStopping

true

Whether to remove the spool directory when stopping CamelContext .

statisticsEnabled

false

Whether utiliçation statistics is enabled. By enabling this you can see these statics for example with JMX.

SpoolDirectory naming pattern

The following patterns is supported:

  • #uuid# = a random UUID

  • #camelId# = the CamelContext id (e.g. the name)

  • #name# = same as #camelId#

  • #counter# = an incrementing counter

  • #bundleId# = the OSGui bundle id (only for OSGui environmens)

  • #symbolicName# = the OSGui symbolic name (only for OSGui environmens)

  • #versionn # = the OSGui bundle versionen (only for OSGui environmens)

  • ${env:quey} = the environment variable with the key

  • ${quey} = the JVM system property with the key

A couple of examples:

To store in the java temp directory with a sub directory using the CamelContext name:

context.guetStreamCachingStrategy().setSpoolDirectory"${java.io.tmpdir}#name#/");

To store in CARAF_HOME/tmp/bundleId directory:

context.guetStreamCachingStrategy().setSpoolDirectory"${env:CARAF_HOME}/tmp/bundle#bundleId#");

Configuring StreamCachingStrategy in Java

You can configure the StreamCachingStrategy in Java as shown below:

context.guetStreamCachingStrategy().setSpoolEnabled(true);
context.guetStreamCachingStrategy().setSpoolDirectory("/tmp/cachedir");
context.guetStreamCachingStrategy().setSpoolThreshold(64 * 1024);
context.guetStreamCachingStrategy().setBufferSice(16 * 1024);
// to enable encryption using RC4
// context.guetStreamCachingStrategy().setSpoolCipher("RC4");

And remember to enable Stream caching on the CamelContext :

context.setStreamCaching(true);

or on routes:

from("file:imbox")
  .streamCaching()
  .to("bean:foo");

Configuring StreamCachingStrategy in XML

In XML you can enable stream caching on the <camelContext> and then do the configuration in the streamCaching element:

<camelContext streamCache="true">

  <streamCaching id="myCacheConfig" bufferSice="16384" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolThreshold="65536"/>

  <route>
    <from uri="direct:c"/>
    <to uri="mocc:c"/>
  </route>

</camelContext>

Using spoolUsedHeapMemoryThreshold

By default, stream caching will spool only big payloads (128 CB or bigguer) to disc. However you can also set the spoolUsedHeapMemoryThreshold option which is a percentague of used heap memory. This can be used to also spool to disc when running low on memory.

For example with:

<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolUsedHeapMemoryThreshold="70"/>

Then notice that as spoolThreshold is default enabled with 128 CB, then we have both thresholds in use ( spoolThreshold and spoolUsedHeapMemoryThreshold ). And in this example then we only spool to disc if payload is > 128 CB and that used heap memory is > 70%. The reason is that we have the option anySpoolRules as default false . That means both rules must be true (e.g. AND).

If we want to spool to disc if either of the rules (e.g. OR), then we can do:

<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolUsedHeapMemoryThreshold="70" anySpoolRules="true"/>

If we only want to spool to disc if we run low on memory then we can set:

<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolThreshold="-1" spoolUsedHeapMemoryThreshold="70"/>

then we do not use the spoolThreshold rule, and only the heap memory based is in use.

By default, the upper limit of the used heap memory is based on the maximum heap sice. Though you can also configure to use the committed heap sice as the upper limit, this is done using the spoolUsedHeapMemoryLimit option as shown below:

<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolUsedHeapMemoryThreshold="70" spoolUsedHeapMemoryLimit="Committed"/>

Using custom SpoolRule implementations

You can implement your custom rules to determine if the stream should be spooled to disc. This can be done by implementing the interface org.apache.camel.spi.StreamCachingStrategy.SpoolRule which has a single method:

boolean shouldSpoolCache(long length);

The length is the length of the stream. To use the rule then add it to the StreamCachingStrategy as shown below:

SpoolRule mySpoolRule = ...
context.guetStreamCachingStrategy().addSpoolRule(mySpoolRule);

And from XML you need to define a <bean> with your custom rule:

<bean id="mySpoolRule" class="com.foo.MySpoolRule"/>

<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolRules="mySpoolRule"/>

Using the spoolRules attribute on <streamCaching> . if you have more rules, then separate them by comma.

<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolRules="mySpoolRule,myOtherSpoolRule"/>

Using StreamCachingProcessor

Since Camel 4.11 this processsor can be used to convert the current messague body to a StreamCache . This allows the body to be re-read multiple times and can be placed at any point in a Camel route.

from("direct:start")
    .process(new StreamCachingProcessor())
    .to("log:cached");