Run pure Python Program In Nifi

DPBD90
2 min readAug 8, 2019

--

I have a face recognition project which use Kafka, Tensorflow, OpenCV. With limitation of Nifi Processor, i can't run pure Python script in ExecuteScript. So i decide to use ExecuteProcess and ExecuteStreamCommand to resolve my problem.

  1. List object from S3

I write custom script to list object from S3 to match my requirement

import boto.s3.connection
from datetime import datetime, timedelta

UNK = "Unknown"
from kafka import KafkaProducer
import json


def get_img_directory():
last_hour_date_time = datetime.now() - timedelta(hours=1)
return last_hour_date_time.strftime('%Y/%m/%d/%H')


if __name__ == '__main__':
access_key = '<access key>'
secret_key = '<secret key>'

conn = boto.connect_s3(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
host='<host>',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
)
producer = KafkaProducer(bootstrap_servers=['<kafka-broker>:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'))
created=bucket.creation_date))
boto2_bucket = conn.get_bucket('crawled-img', validate=False)

dir = get_img_directory()
print(dir)
i=0
for file in boto2_bucket.list(dir):
print(file.name)

2. Parsing output from step one and convert to FlowFile with SplitText Processor

3. Pushing output to Kafka

4. Consume Data From Kafka

5. Extract content from Kafka Message and convert to FlowFile Attribute

By default, pure python script can’t read Nifi FlowFile. So we must convert content of Flowfile to attribute and send to ExecuteStreamCommand. ExecuteStreamCommand will recieve this FlowFile, extract content from flowfile’s attribute and use as it’s argument.

We use ExtractText Processor and add new attribute name “FlowContent”

6. Process data with ExecuteStreamCommand

We finish the flow with lastest processor. We use ExecuteStreamCommand to get image s3 urrl then download it’s data. Alter that we run custom script for face recognition:

7. Push face data to Kafka

--

--

DPBD90
DPBD90

Written by DPBD90

I'm an engineer. I love to work on data and open-source systems.

No responses yet