Setting an Attribute with a Dynamic Property Name

Blog

Setting an Attribute with a Dynamic Property Name

  • 28 April, 2021
  • By Dave Cassel
blog-image

The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow file. For most situations that works great, but what if you want to retrieve a value where the property you want to read depends on a flow file attribute? EvaluateJsonPath doesn’t support the expression language on dynamic attributes.

As an example, suppose I have a flow that gets JSON content with different structures. Somewhere in my flow I’ll set an attribute saying which JSON property to read to get a value. We won’t be able to use EvaluateJsonPath, since the path has to be static. ExecuteScript to the rescue!

The ExecuteScript processor is a great way to handle situations that the out-of-the-box processors don’t support. (You also have the option to write a custom processor, but that’s much more complicated.)

Here’s my setup for testing:

The GenerateFlowFile processor has some JSON for the content:

{
  "keyProperty": 1,
  "anotherKeyProperty": "some value"
}

It also sets two attributes:

  • propName with the value of “keyProperty”
  • anotherProp with the value of “anotherKeyProperty”

The ExecuteScript processor uses ECMAScript and the following code:

/* Add objects to the mapping to indicate which attribute is to 
 * be read (sourceAttribute) and where that value is to be 
 * written (targetAttribute). 
 */
var mapping = [
  { 
    sourceAttribute: 'propName', 
    targetAttribute: 'myNewProperty' 
  },
  { 
    sourceAttribute: 'anotherProp', 
    targetAttribute: 'someOtherProperty' 
  }
]; 


var InputStreamCallback =    
  Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = 
  Java.type("java.nio.charset.StandardCharsets");

var flowFile = session.get();
if (flowFile != null) {
  try {
    var json = null;
    // Create a new InputStreamCallback, passing in a function
    // to define the interface method
    session.read(flowFile,
      new InputStreamCallback(function(inputStream) {
        json = JSON.parse(IOUtils.toString(inputStream, StandardCharsets.UTF_8));
      })
    );
    var attrMap = {};
    mapping.forEach(function(attr) {
      var value = json[flowFile.getAttribute(attr.sourceAttribute)];
      attrMap[attr.targetAttribute] = value ? value.toString() : "";
    });
    flowFile = session.putAllAttributes(flowFile, attrMap);
    session.transfer(flowFile, REL_SUCCESS);
  }
  catch (e) { 
    flowFile = session.putAttribute(flowFile, 'dynamicAttributeError', e); 
    session.transfer(flowFile, REL_FAILURE);
  }
}

The mappings array in the first few lines is how you configure the processor, defining what attributes should be set.

Share this post:

quote
The NiFi EvaluateJsonPath processor lets you specify a JSON path, which will be applied to the content of a flow...

4V Services works with development teams to boost their knowledge and capabilities. Contact us today to talk about how we can help you succeed!

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments
cta-bg

Get In Touch

Wherever you are in your data management journey, we're here to help you take the next step. Whether you're just starting to identify challenges or are ready to implement a solution, our team is ready to assist.