-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkinesis_reader.py
More file actions
46 lines (41 loc) · 1.64 KB
/
kinesis_reader.py
File metadata and controls
46 lines (41 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python
"""
Usage: %(scriptName)s --stream-name=STREAM_NAME --limit=LIMIT
This script pulls records from AWS kinesis stream for debugging purposes.
"""
import click
import boto3
import time
import json
import datetime
import time
@click.command()
@click.argument('stream_name', required=True)
@click.option('--limit', default=2, help="Number of records to pull from each available shard. Default is 2.")
@click.option('--timedelta', default=5, help="Number of minutes to look back in shard. Used with 'AT_TIMESTAMP'. Default is 5.")
def get_stream_data(stream_name, limit, timedelta):
client = boto3.client('kinesis')
if stream_name:
stream = client.describe_stream(StreamName=stream_name)['StreamDescription']
for shard in stream['Shards']:
print "### %s - %s"%(stream_name, shard['ShardId'])
shard_iterator = client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard['ShardId'],
ShardIteratorType='AT_TIMESTAMP', #'TRIM_HORIZON'|'LATEST'
Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=timedelta)
)['ShardIterator']
while True:
out = client.get_records(ShardIterator=shard_iterator, Limit=limit)
if out["Records"]:
for record in out["Records"]:
data = json.loads(record["Data"])
print data
break
else:
print out
time.sleep(1)
else:
print "Need stream name !!!"
if __name__ == '__main__':
get_stream_data()