I have a face recognition project which use Kafka, Tensorflow, OpenCV. With limitation of Nifi Processor, i can't run pure Python script in ExecuteScript. So i decide to use ExecuteProcess and ExecuteStreamCommand to resolve my problem.
- List object from S3
I write custom script to list object from S3 to match my requirement
import boto.s3.connection
from datetime import datetime, timedelta
UNK = "Unknown"
from kafka import KafkaProducer
import json
def get_img_directory():
last_hour_date_time = datetime.now() - timedelta(hours=1)
return last_hour_date_time.strftime('%Y/%m/%d/%H')
if __name__ == '__main__':
access_key = '<access key>'
secret_key = '<secret key>'
conn = boto.connect_s3(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
host='<host>',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
)
producer = KafkaProducer(bootstrap_servers=['<kafka-broker>:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'))
created=bucket.creation_date))
boto2_bucket = conn.get_bucket('crawled-img', validate=False)
dir = get_img_directory()
print(dir)
i=0
for file in boto2_bucket.list(dir):
print(file.name)
2. Parsing output from step one and convert to FlowFile with SplitText Processor
3. Pushing output to Kafka
4. Consume Data From Kafka
5. Extract content from Kafka Message and convert to FlowFile Attribute
By default, pure python script can’t read Nifi FlowFile. So we must convert content of Flowfile to attribute and send to ExecuteStreamCommand. ExecuteStreamCommand will recieve this FlowFile, extract content from flowfile’s attribute and use as it’s argument.
We use ExtractText Processor and add new attribute name “FlowContent”
6. Process data with ExecuteStreamCommand
We finish the flow with lastest processor. We use ExecuteStreamCommand to get image s3 urrl then download it’s data. Alter that we run custom script for face recognition:
7. Push face data to Kafka