From 768306b2e832671f37fb0b1d1a009fcb07807ad3 Mon Sep 17 00:00:00 2001 From: Vishal Karande Date: Wed, 15 Jul 2015 18:13:51 -0700 Subject: [PATCH 1/3] SAMOA-40: Add Kafka stream reader modules to consume data from Kafka framework Apache SAMOA is designed to process streaming data and develop streaming machine learning algorithm. Currently, SAMOA framework supports stream data read from Arff files only. Thus, while using SAMOA as a streaming machine learning component in real time use-cases, writing and reading data from files is slow and inefficient. A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. The ability to read data directly from Apache Kafka into SAMOA will not only improve performance but also make SAMOA pluggable to many real time machine learning use cases such as Internet of Things(IoT). GOAL: Add code that enables SAMOA to read data from Apache Kafka as a stream data. Kafka stream reader supports following different options for streaming: a) Topic selection - Kafka topic to read data b) Partition selection - Kafka partition to read data c) Batching - Number of data instances read from Kafka in one read request to Kafka d) Configuration options - Kafka port number, seed information, time delay between two read requests Components: KafkaReader - Consists for APIs to read data from Kafka KafkaStream - Stream source for SAMOA providing data read from Kafka Dependencies for Kafka are added in pom.xml for in samoa-api component. --- .../current/VERSION | 5 + .../current/finalized/blk_1073741825 | 1 + .../finalized/blk_1073741825_1001.meta | Bin 0 -> 11 bytes .../current/finalized/blk_1073741827 | 1 + .../finalized/blk_1073741827_1003.meta | Bin 0 -> 11 bytes .../dncp_block_verification.log.curr | 0 .../dncp_block_verification.log.prev | 0 .../test/data/dfs/data/data1/current/VERSION | 6 + .../current/VERSION | 5 + .../current/finalized/blk_1073741826 | 1 + .../finalized/blk_1073741826_1002.meta | Bin 0 -> 11 bytes .../current/finalized/blk_1073741828 | 1 + .../finalized/blk_1073741828_1004.meta | Bin 0 -> 11 bytes .../test/data/dfs/data/data2/current/VERSION | 6 + .../build/test/data/dfs/name1/current/VERSION | 7 + ...ts_0000000000000000001-0000000000000000023 | Bin 0 -> 1595 bytes .../name1/current/fsimage_0000000000000000000 | Bin 0 -> 200 bytes .../current/fsimage_0000000000000000000.md5 | 1 + .../test/data/dfs/name1/current/seen_txid | 1 + .../build/test/data/dfs/name2/current/VERSION | 7 + ...ts_0000000000000000001-0000000000000000023 | Bin 0 -> 1595 bytes .../name2/current/fsimage_0000000000000000000 | Bin 0 -> 200 bytes .../current/fsimage_0000000000000000000.md5 | 1 + .../test/data/dfs/name2/current/seen_txid | 1 + samoa-api/pom.xml | 10 + .../samoa/streams/kafka/KafkaReader.java | 279 ++++++++++++++++++ .../samoa/streams/kafka/KafkaStream.java | 201 +++++++++++++ 27 files changed, 534 insertions(+) create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825_1001.meta create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827_1003.meta create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.curr create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.prev create mode 100644 samoa-api/build/test/data/dfs/data/data1/current/VERSION create mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION create mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 create mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826_1002.meta create mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 create mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828_1004.meta create mode 100644 samoa-api/build/test/data/dfs/data/data2/current/VERSION create mode 100644 samoa-api/build/test/data/dfs/name1/current/VERSION create mode 100644 samoa-api/build/test/data/dfs/name1/current/edits_0000000000000000001-0000000000000000023 create mode 100644 samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000 create mode 100644 samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 create mode 100644 samoa-api/build/test/data/dfs/name1/current/seen_txid create mode 100644 samoa-api/build/test/data/dfs/name2/current/VERSION create mode 100644 samoa-api/build/test/data/dfs/name2/current/edits_0000000000000000001-0000000000000000023 create mode 100644 samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000 create mode 100644 samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 create mode 100644 samoa-api/build/test/data/dfs/name2/current/seen_txid create mode 100644 samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java create mode 100644 samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION new file mode 100644 index 00000000..6f667e76 --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION @@ -0,0 +1,5 @@ +#Wed Jul 15 17:02:57 PDT 2015 +namespaceID=1199357958 +cTime=0 +blockpoolID=BP-511502848-10.14.2.232-1437004977072 +layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 new file mode 100644 index 00000000..56a6051c --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 @@ -0,0 +1 @@ +1 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825_1001.meta b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825_1001.meta new file mode 100644 index 0000000000000000000000000000000000000000..d50ad1a205f828edf1e97876feda8eddf081d8b4 GIT binary patch literal 11 ScmZQzWMW`oVwmuC=3@W{X9EWS literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 new file mode 100644 index 00000000..e440e5c8 --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 @@ -0,0 +1 @@ +3 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827_1003.meta b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827_1003.meta new file mode 100644 index 0000000000000000000000000000000000000000..cbade407d6ce22f59bcc66c610116c38f97ad15c GIT binary patch literal 11 ScmZQzWMW`oVkkWKQUm}7M*>0s literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.curr b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.curr new file mode 100644 index 00000000..e69de29b diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.prev b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.prev new file mode 100644 index 00000000..e69de29b diff --git a/samoa-api/build/test/data/dfs/data/data1/current/VERSION b/samoa-api/build/test/data/dfs/data/data1/current/VERSION new file mode 100644 index 00000000..a7666b04 --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data1/current/VERSION @@ -0,0 +1,6 @@ +#Wed Jul 15 17:02:57 PDT 2015 +storageID=DS-1694288922-10.14.2.232-57060-1437004977323 +clusterID=testClusterID +cTime=0 +storageType=DATA_NODE +layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION new file mode 100644 index 00000000..6f667e76 --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION @@ -0,0 +1,5 @@ +#Wed Jul 15 17:02:57 PDT 2015 +namespaceID=1199357958 +cTime=0 +blockpoolID=BP-511502848-10.14.2.232-1437004977072 +layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 new file mode 100644 index 00000000..d8263ee9 --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826_1002.meta b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826_1002.meta new file mode 100644 index 0000000000000000000000000000000000000000..b9fd814b2b8a461d2bb77f7e311ee756eccdf014 GIT binary patch literal 11 ScmZQzWMW`oVrX8PB@O@u83C^V literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 new file mode 100644 index 00000000..bf0d87ab --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 @@ -0,0 +1 @@ +4 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828_1004.meta b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828_1004.meta new file mode 100644 index 0000000000000000000000000000000000000000..f79a87584ad17e32fc9c5c94051b098a3df1c3ac GIT binary patch literal 11 ScmZQzWMW`oVpz)3`yT)V>;gLg literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/data/data2/current/VERSION b/samoa-api/build/test/data/dfs/data/data2/current/VERSION new file mode 100644 index 00000000..a7666b04 --- /dev/null +++ b/samoa-api/build/test/data/dfs/data/data2/current/VERSION @@ -0,0 +1,6 @@ +#Wed Jul 15 17:02:57 PDT 2015 +storageID=DS-1694288922-10.14.2.232-57060-1437004977323 +clusterID=testClusterID +cTime=0 +storageType=DATA_NODE +layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/name1/current/VERSION b/samoa-api/build/test/data/dfs/name1/current/VERSION new file mode 100644 index 00000000..40ed1e62 --- /dev/null +++ b/samoa-api/build/test/data/dfs/name1/current/VERSION @@ -0,0 +1,7 @@ +#Wed Jul 15 17:02:57 PDT 2015 +namespaceID=1199357958 +clusterID=testClusterID +cTime=0 +storageType=NAME_NODE +blockpoolID=BP-511502848-10.14.2.232-1437004977072 +layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/name1/current/edits_0000000000000000001-0000000000000000023 b/samoa-api/build/test/data/dfs/name1/current/edits_0000000000000000001-0000000000000000023 new file mode 100644 index 0000000000000000000000000000000000000000..5f123dabf14053ad350d6f44ba363c47f276e270 GIT binary patch literal 1595 zcmezW|Nlh^1_)qm6FU;f4COFEXa^<+9{t?Ryv&re;*iwh5(WlFzbV>R~V+@a<$Kn-+YW)MO)NZ(Meq@sj@5o(YEf@Wagfa=9+ z_!0&s7q?*NoXpg`l6XIVKVQdyAXk@AXV-XB14|2Y3qxbWctZwGLnCuN10XbH5C~ND zUV6XIBW79Hhx~-_Ei-_o7bom!Sq`)l3|K&<1JD6sn}0o!2MaMUutI3&mkR>KT_vG{ zY}ot&^a4aXLm%@?28RFt|Npzqx^yllR1Z6pX27Nw?td*T{zn)LF#^nIU|@cUJzN&B z3C)7p1r0$57BowZPy$^CYAKMWN1#`}pC4SH0CgfbcpMlQnC!m#@WKL}3nIz<>e+)% zJy@V~WAg(nY?y$7{t6Q4ddn63V1dp9wFN!UQT=a*#s3I{Ax6Lg{T22=pLO`JFU&4p z2*ZID&B4Yffo=h{6iCw}(Ca3BTwkF8aTx<2h;{%bA>GM>L9jsQhe$HNzFqL=J1o!z zu=xQNHq5|4e+>!roZ!V%V1X_OF$hSa=5$p51FZ)aA;|tm7z{B27U-|B2l`4|V8Vt1 zAqee&o;6KS0^JE}DUhZ|pf~$@Oqr$taT$X!h<0FLV0l-ecpVn#A`nUDH`$F_f4~A= z6fNMuet?Ay3oy{%Kmz@oP+l1<(8VAI0ZG(ANA3!^=?G~ literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000 b/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000 new file mode 100644 index 0000000000000000000000000000000000000000..4e548f68afe496ffcf56703830a0ba7bf682eab9 GIT binary patch literal 200 zcmezW|NlkzngeVM5Wonbm|vjM5S{~sLLwcY>hTM~W&Z;~J%~U8j2vaziA9NdDXCn= Yr3I-)=|%aa1&nVYda#%eHw={l0QuM?M*si- literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 b/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 new file mode 100644 index 00000000..95a6960e --- /dev/null +++ b/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 @@ -0,0 +1 @@ +5b946996f61c736be7bcef1bb0825e57 *fsimage_0000000000000000000 diff --git a/samoa-api/build/test/data/dfs/name1/current/seen_txid b/samoa-api/build/test/data/dfs/name1/current/seen_txid new file mode 100644 index 00000000..d00491fd --- /dev/null +++ b/samoa-api/build/test/data/dfs/name1/current/seen_txid @@ -0,0 +1 @@ +1 diff --git a/samoa-api/build/test/data/dfs/name2/current/VERSION b/samoa-api/build/test/data/dfs/name2/current/VERSION new file mode 100644 index 00000000..40ed1e62 --- /dev/null +++ b/samoa-api/build/test/data/dfs/name2/current/VERSION @@ -0,0 +1,7 @@ +#Wed Jul 15 17:02:57 PDT 2015 +namespaceID=1199357958 +clusterID=testClusterID +cTime=0 +storageType=NAME_NODE +blockpoolID=BP-511502848-10.14.2.232-1437004977072 +layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/name2/current/edits_0000000000000000001-0000000000000000023 b/samoa-api/build/test/data/dfs/name2/current/edits_0000000000000000001-0000000000000000023 new file mode 100644 index 0000000000000000000000000000000000000000..5f123dabf14053ad350d6f44ba363c47f276e270 GIT binary patch literal 1595 zcmezW|Nlh^1_)qm6FU;f4COFEXa^<+9{t?Ryv&re;*iwh5(WlFzbV>R~V+@a<$Kn-+YW)MO)NZ(Meq@sj@5o(YEf@Wagfa=9+ z_!0&s7q?*NoXpg`l6XIVKVQdyAXk@AXV-XB14|2Y3qxbWctZwGLnCuN10XbH5C~ND zUV6XIBW79Hhx~-_Ei-_o7bom!Sq`)l3|K&<1JD6sn}0o!2MaMUutI3&mkR>KT_vG{ zY}ot&^a4aXLm%@?28RFt|Npzqx^yllR1Z6pX27Nw?td*T{zn)LF#^nIU|@cUJzN&B z3C)7p1r0$57BowZPy$^CYAKMWN1#`}pC4SH0CgfbcpMlQnC!m#@WKL}3nIz<>e+)% zJy@V~WAg(nY?y$7{t6Q4ddn63V1dp9wFN!UQT=a*#s3I{Ax6Lg{T22=pLO`JFU&4p z2*ZID&B4Yffo=h{6iCw}(Ca3BTwkF8aTx<2h;{%bA>GM>L9jsQhe$HNzFqL=J1o!z zu=xQNHq5|4e+>!roZ!V%V1X_OF$hSa=5$p51FZ)aA;|tm7z{B27U-|B2l`4|V8Vt1 zAqee&o;6KS0^JE}DUhZ|pf~$@Oqr$taT$X!h<0FLV0l-ecpVn#A`nUDH`$F_f4~A= z6fNMuet?Ay3oy{%Kmz@oP+l1<(8VAI0ZG(ANA3!^=?G~ literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000 b/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000 new file mode 100644 index 0000000000000000000000000000000000000000..4e548f68afe496ffcf56703830a0ba7bf682eab9 GIT binary patch literal 200 zcmezW|NlkzngeVM5Wonbm|vjM5S{~sLLwcY>hTM~W&Z;~J%~U8j2vaziA9NdDXCn= Yr3I-)=|%aa1&nVYda#%eHw={l0QuM?M*si- literal 0 HcmV?d00001 diff --git a/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 b/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 new file mode 100644 index 00000000..95a6960e --- /dev/null +++ b/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 @@ -0,0 +1 @@ +5b946996f61c736be7bcef1bb0825e57 *fsimage_0000000000000000000 diff --git a/samoa-api/build/test/data/dfs/name2/current/seen_txid b/samoa-api/build/test/data/dfs/name2/current/seen_txid new file mode 100644 index 00000000..d00491fd --- /dev/null +++ b/samoa-api/build/test/data/dfs/name2/current/seen_txid @@ -0,0 +1 @@ +1 diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 272b1b9c..30756be2 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -101,6 +101,16 @@ ${hadoop.version} test + + org.apache.kafka + kafka-clients + 0.8.2.1 + + + org.apache.kafka + kafka_2.10 + 0.8.1 + diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java new file mode 100644 index 00000000..8097ddd5 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java @@ -0,0 +1,279 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.*; + +public class KafkaReader { + + protected long readOffset; + + private List m_replicaBrokers = new ArrayList(); + + public KafkaReader() { + m_replicaBrokers = new ArrayList(); + readOffset = 0L; + } + + public ArrayList run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) { + + // find the meta data about the topic and partition we are interested in + String answer = ""; + ArrayList returnInstances = new ArrayList(); + PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + System.out.println("Can't find metadata for Topic and Partition. Exiting"); + return null; + } + if (metadata.leader() == null) { + System.out.println("Can't find Leader for Topic and Partition. Exiting"); + return null; + } + String leadBroker = metadata.leader().host(); + String clientName = "Client_" + a_topic + "_" + a_partition; + + SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); + //long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); + //readOffset = 0L; + int numErrors = 0; + while (a_maxReads > 0) { + if (consumer == null) { + consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); + } + /** + * Reading data + */ + FetchRequest req = new FetchRequestBuilder() + .clientId(clientName) + .addFetch(a_topic, a_partition, readOffset, 100000) + .build(); + FetchResponse fetchResponse = null; + + try { + fetchResponse = consumer.fetch(req); + } catch (Exception e) { + + } + + /** + * SimpleConsumer does not handle lead broker failures, you have to handle it + * once the fetch returns an error, we log the reason, close the consumer then try to figure + * out who the new leader is + */ + if (fetchResponse.hasError()) { + numErrors++; + // Something went wrong! + short code = fetchResponse.errorCode(a_topic, a_partition); + System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); + if (numErrors > 5) break; + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // We asked for an invalid offset. For simple case ask for the last element to reset + //readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); + continue; + } + consumer.close(); + consumer = null; + try { + leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + continue; + } + //End Error handling + + // Reading data cont. + numErrors = 0; + + long numRead = 0; + Iterator it = (Iterator) fetchResponse.messageSet(a_topic, a_partition).iterator(); + MessageAndOffset messageAndOffset = null; + + try { + messageAndOffset = (MessageAndOffset) it.next(); + } catch (Exception e) { + return null; + } + + //for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { + long currentOffset = messageAndOffset.offset(); + if (currentOffset < readOffset) { + System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); + continue; + } + readOffset = messageAndOffset.nextOffset(); + ByteBuffer payload = messageAndOffset.message().payload(); + + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + try { + System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); + answer = String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"); + returnInstances.add(answer); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + + numRead++; + a_maxReads--; + // break; + //} + + if (numRead == 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } + } + if (consumer != null) consumer.close(); + return returnInstances; + } + + /** + * Defines where to start reading data from + * Helpers Available: + * kafka.api.OffsetRequest.EarliestTime() => finds the beginning of the data in the logs and starts streaming + * from there + * kafka.api.OffsetRequest.LatestTime() => will only stream new messages + * + * @param consumer + * @param topic + * @param partition + * @param whichTime + * @param clientName + * @return + */ + public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); + return 0; + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + + /** + * Uses the findLeader() logic we defined to find the new leader, except here we only try to connect to one of the + * replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested + * in we give up and exit hard. + * + * @param a_oldLeader + * @param a_topic + * @param a_partition + * @param a_port + * @return + * @throws Exception + */ + private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { + for (int i = 0; i < 3; i++) { + boolean goToSleep = false; + PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + goToSleep = true; + } else if (metadata.leader() == null) { + goToSleep = true; + } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { + // first time through if the leader hasn't changed give ZooKeeper a second to recover + // second time, assume the broker did recover before failover, or it was a non-Broker issue + // + goToSleep = true; + } else { + return metadata.leader().host(); + } + if (goToSleep) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } + } + System.out.println("Unable to find new leader after Broker failure. Exiting"); + throw new Exception("Unable to find new leader after Broker failure. Exiting"); + } + + /** + * Query a live broker to find out leader information and replica information for a given topic and partition + * + * @param a_seedBrokers + * @param a_port + * @param a_topic + * @param a_partition + * @return + */ + private PartitionMetadata findLeader(List a_seedBrokers, int a_port, String a_topic, int a_partition) { + PartitionMetadata returnMetaData = null; + for (String seed : a_seedBrokers) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); //broker_host, broker_port, timeout, buffer_size, client_id + List topics = new ArrayList(); + topics.add(a_topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + //call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in + List metaData = resp.topicsMetadata(); + //loop on partitionsMetadata iterates through all the partitions until we find the one we want. + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == a_partition) { + returnMetaData = part; + break; + } + } + } + } catch (Exception e) { + System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + + ", " + a_partition + "] Reason: " + e); + } finally { + if (consumer != null) consumer.close(); + } + } + // add replica broker info to m_replicaBrokers + if (returnMetaData != null) { + m_replicaBrokers.clear(); + for (kafka.cluster.Broker replica : returnMetaData.replicas()) { + m_replicaBrokers.add(replica.host()); + } + } + return returnMetaData; + } +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java new file mode 100644 index 00000000..610cd287 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java @@ -0,0 +1,201 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import org.apache.samoa.instances.*; +import org.apache.samoa.moa.core.Example; +import org.apache.samoa.moa.core.FastVector; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.tasks.TaskMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +public class KafkaStream extends AbstractOptionHandler implements + InstanceStream { + + private static final long serialVersionUID = 1L; + + protected InstancesHeader streamHeader; + + protected Instances instances; + + private KafkaReader reader; + + protected InstanceExample lastInstanceRead; + + protected Queue instanceQueue; + + private static final Logger logger = LoggerFactory + .getLogger(KafkaStream.class); + + public void KafkaReader() { + reader = new KafkaReader(); + } + + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", + -1, -1, Integer.MAX_VALUE); + + public IntOption numAttrOption = new IntOption("numNumerics", 'u', + "The number of numeric attributes in" + + " dataset", 300, 0, + Integer.MAX_VALUE); + + public StringOption topicOption = new StringOption("topic", 't', + "Topic in the kafka to be used for reading data", "test"); + + public IntOption numMaxreadOption = new IntOption("numMaxread", 'r', + "Number of instances to be read in single read from kafka", 1, 0, + Integer.MAX_VALUE); + + public IntOption partitionOption = new IntOption("partition", 'n', + "Partition number to be used for reading data", 0); + + public IntOption portOption = new IntOption("port", 'p', + "Port in kafka to read data", 9092); + + public StringOption seedOption = new StringOption("seed", 's', + "Seeds for kafka", "localhost"); + + public IntOption numClassesOption = new IntOption("numClasses", 'k', + "The number of classes in the data.", 2, 2, Integer.MAX_VALUE); + + public IntOption timeDelayOption = new IntOption("timeDelay", 'y', + "Time delay in milliseconds between two read from kafka", 0, 0, Integer.MAX_VALUE); + + @Override + protected void prepareForUseImpl(TaskMonitor monitor, + ObjectRepository repository) { + this.reader = new KafkaReader(); + generateHeader(); + instanceQueue = new LinkedList(); + this.lastInstanceRead = null; + } + + protected void generateHeader() { + FastVector attributes = new FastVector<>(); + + for (int i = 0; i < this.numAttrOption.getValue(); i++) { + attributes.addElement(new Attribute("numeric" + (i + 1))); + } + FastVector classLabels = new FastVector<>(); + for (int i = 0; i < this.numClassesOption.getValue(); i++) { + classLabels.addElement("class" + (i + 1)); + } + + attributes.addElement(new Attribute("class", classLabels)); + this.streamHeader = new InstancesHeader(new Instances( + getCLICreationString(InstanceStream.class), attributes, 0)); + + if (this.classIndexOption.getValue() < 0) { + this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.streamHeader.setClassIndex(this.classIndexOption.getValue() - 1); + } + + } + + @Override + public InstancesHeader getHeader() { + return this.streamHeader; + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + private String getNextInstanceFromKafka() { + if (!instanceQueue.isEmpty()) { + return instanceQueue.remove(); + } + + List seeds = new ArrayList(); + seeds.add(this.seedOption.getValue()); + ArrayList kafkaData; + + do { + kafkaData = this.reader.run(this.numMaxreadOption.getValue(), + this.topicOption.getValue(), this.partitionOption.getValue(), + seeds, this.portOption.getValue()); + } while (kafkaData == null); + + instanceQueue.addAll(kafkaData); + return instanceQueue.remove(); + } + + @Override + public Example nextInstance() { + InstancesHeader header = getHeader(); + Instance inst = new DenseInstance(header.numAttributes()); + + String kafkaString = getNextInstanceFromKafka(); + String[] KeyValueString = kafkaString.split(":"); + String[] attributes = KeyValueString[1].split(","); + + for (int i = 0; i < attributes.length - 1; i++) { + if (i < numAttrOption.getValue()) { + inst.setValue(i, Double.parseDouble(attributes[i])); + } + } + inst.setDataset(header); + inst.setClassValue(Double + .parseDouble(attributes[attributes.length - 1])); + + try { + Thread.sleep(timeDelayOption.getValue()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new InstanceExample(inst); + } + + @Override + public boolean isRestartable() { + // TODO Auto-generated method stub + return true; + } + + @Override + public void restart() { + this.reader = new KafkaReader(); + } + + @Override + public boolean hasMoreInstances() { + return true; + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +} From 8f7c5df0295409fa465963fde6f5d1a6a228abef Mon Sep 17 00:00:00 2001 From: Vishal Karande Date: Thu, 16 Jul 2015 01:40:21 -0700 Subject: [PATCH 2/3] SAMOA 40: Removing build data in samoa-api --- .../current/VERSION | 5 ----- .../current/finalized/blk_1073741825 | 1 - .../current/finalized/blk_1073741825_1001.meta | Bin 11 -> 0 bytes .../current/finalized/blk_1073741827 | 1 - .../current/finalized/blk_1073741827_1003.meta | Bin 11 -> 0 bytes .../dncp_block_verification.log.curr | 0 .../dncp_block_verification.log.prev | 0 .../test/data/dfs/data/data1/current/VERSION | 6 ------ .../current/VERSION | 5 ----- .../current/finalized/blk_1073741826 | 1 - .../current/finalized/blk_1073741826_1002.meta | Bin 11 -> 0 bytes .../current/finalized/blk_1073741828 | 1 - .../current/finalized/blk_1073741828_1004.meta | Bin 11 -> 0 bytes .../test/data/dfs/data/data2/current/VERSION | 6 ------ .../build/test/data/dfs/name1/current/VERSION | 7 ------- ...edits_0000000000000000001-0000000000000000023 | Bin 1595 -> 0 bytes .../name1/current/fsimage_0000000000000000000 | Bin 200 -> 0 bytes .../current/fsimage_0000000000000000000.md5 | 1 - .../build/test/data/dfs/name1/current/seen_txid | 1 - .../build/test/data/dfs/name2/current/VERSION | 7 ------- ...edits_0000000000000000001-0000000000000000023 | Bin 1595 -> 0 bytes .../name2/current/fsimage_0000000000000000000 | Bin 200 -> 0 bytes .../current/fsimage_0000000000000000000.md5 | 1 - .../build/test/data/dfs/name2/current/seen_txid | 1 - 24 files changed, 44 deletions(-) delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825_1001.meta delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827_1003.meta delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.curr delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.prev delete mode 100644 samoa-api/build/test/data/dfs/data/data1/current/VERSION delete mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION delete mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 delete mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826_1002.meta delete mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 delete mode 100644 samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828_1004.meta delete mode 100644 samoa-api/build/test/data/dfs/data/data2/current/VERSION delete mode 100644 samoa-api/build/test/data/dfs/name1/current/VERSION delete mode 100644 samoa-api/build/test/data/dfs/name1/current/edits_0000000000000000001-0000000000000000023 delete mode 100644 samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000 delete mode 100644 samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 delete mode 100644 samoa-api/build/test/data/dfs/name1/current/seen_txid delete mode 100644 samoa-api/build/test/data/dfs/name2/current/VERSION delete mode 100644 samoa-api/build/test/data/dfs/name2/current/edits_0000000000000000001-0000000000000000023 delete mode 100644 samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000 delete mode 100644 samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 delete mode 100644 samoa-api/build/test/data/dfs/name2/current/seen_txid diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION deleted file mode 100644 index 6f667e76..00000000 --- a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION +++ /dev/null @@ -1,5 +0,0 @@ -#Wed Jul 15 17:02:57 PDT 2015 -namespaceID=1199357958 -cTime=0 -blockpoolID=BP-511502848-10.14.2.232-1437004977072 -layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 deleted file mode 100644 index 56a6051c..00000000 --- a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825 +++ /dev/null @@ -1 +0,0 @@ -1 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825_1001.meta b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741825_1001.meta deleted file mode 100644 index d50ad1a205f828edf1e97876feda8eddf081d8b4..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11 ScmZQzWMW`oVwmuC=3@W{X9EWS diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 deleted file mode 100644 index e440e5c8..00000000 --- a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827 +++ /dev/null @@ -1 +0,0 @@ -3 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827_1003.meta b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741827_1003.meta deleted file mode 100644 index cbade407d6ce22f59bcc66c610116c38f97ad15c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11 ScmZQzWMW`oVkkWKQUm}7M*>0s diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.curr b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.curr deleted file mode 100644 index e69de29b..00000000 diff --git a/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.prev b/samoa-api/build/test/data/dfs/data/data1/current/BP-511502848-10.14.2.232-1437004977072/dncp_block_verification.log.prev deleted file mode 100644 index e69de29b..00000000 diff --git a/samoa-api/build/test/data/dfs/data/data1/current/VERSION b/samoa-api/build/test/data/dfs/data/data1/current/VERSION deleted file mode 100644 index a7666b04..00000000 --- a/samoa-api/build/test/data/dfs/data/data1/current/VERSION +++ /dev/null @@ -1,6 +0,0 @@ -#Wed Jul 15 17:02:57 PDT 2015 -storageID=DS-1694288922-10.14.2.232-57060-1437004977323 -clusterID=testClusterID -cTime=0 -storageType=DATA_NODE -layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION deleted file mode 100644 index 6f667e76..00000000 --- a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/VERSION +++ /dev/null @@ -1,5 +0,0 @@ -#Wed Jul 15 17:02:57 PDT 2015 -namespaceID=1199357958 -cTime=0 -blockpoolID=BP-511502848-10.14.2.232-1437004977072 -layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 deleted file mode 100644 index d8263ee9..00000000 --- a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826 +++ /dev/null @@ -1 +0,0 @@ -2 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826_1002.meta b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741826_1002.meta deleted file mode 100644 index b9fd814b2b8a461d2bb77f7e311ee756eccdf014..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11 ScmZQzWMW`oVrX8PB@O@u83C^V diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 deleted file mode 100644 index bf0d87ab..00000000 --- a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828 +++ /dev/null @@ -1 +0,0 @@ -4 \ No newline at end of file diff --git a/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828_1004.meta b/samoa-api/build/test/data/dfs/data/data2/current/BP-511502848-10.14.2.232-1437004977072/current/finalized/blk_1073741828_1004.meta deleted file mode 100644 index f79a87584ad17e32fc9c5c94051b098a3df1c3ac..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11 ScmZQzWMW`oVpz)3`yT)V>;gLg diff --git a/samoa-api/build/test/data/dfs/data/data2/current/VERSION b/samoa-api/build/test/data/dfs/data/data2/current/VERSION deleted file mode 100644 index a7666b04..00000000 --- a/samoa-api/build/test/data/dfs/data/data2/current/VERSION +++ /dev/null @@ -1,6 +0,0 @@ -#Wed Jul 15 17:02:57 PDT 2015 -storageID=DS-1694288922-10.14.2.232-57060-1437004977323 -clusterID=testClusterID -cTime=0 -storageType=DATA_NODE -layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/name1/current/VERSION b/samoa-api/build/test/data/dfs/name1/current/VERSION deleted file mode 100644 index 40ed1e62..00000000 --- a/samoa-api/build/test/data/dfs/name1/current/VERSION +++ /dev/null @@ -1,7 +0,0 @@ -#Wed Jul 15 17:02:57 PDT 2015 -namespaceID=1199357958 -clusterID=testClusterID -cTime=0 -storageType=NAME_NODE -blockpoolID=BP-511502848-10.14.2.232-1437004977072 -layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/name1/current/edits_0000000000000000001-0000000000000000023 b/samoa-api/build/test/data/dfs/name1/current/edits_0000000000000000001-0000000000000000023 deleted file mode 100644 index 5f123dabf14053ad350d6f44ba363c47f276e270..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1595 zcmezW|Nlh^1_)qm6FU;f4COFEXa^<+9{t?Ryv&re;*iwh5(WlFzbV>R~V+@a<$Kn-+YW)MO)NZ(Meq@sj@5o(YEf@Wagfa=9+ z_!0&s7q?*NoXpg`l6XIVKVQdyAXk@AXV-XB14|2Y3qxbWctZwGLnCuN10XbH5C~ND zUV6XIBW79Hhx~-_Ei-_o7bom!Sq`)l3|K&<1JD6sn}0o!2MaMUutI3&mkR>KT_vG{ zY}ot&^a4aXLm%@?28RFt|Npzqx^yllR1Z6pX27Nw?td*T{zn)LF#^nIU|@cUJzN&B z3C)7p1r0$57BowZPy$^CYAKMWN1#`}pC4SH0CgfbcpMlQnC!m#@WKL}3nIz<>e+)% zJy@V~WAg(nY?y$7{t6Q4ddn63V1dp9wFN!UQT=a*#s3I{Ax6Lg{T22=pLO`JFU&4p z2*ZID&B4Yffo=h{6iCw}(Ca3BTwkF8aTx<2h;{%bA>GM>L9jsQhe$HNzFqL=J1o!z zu=xQNHq5|4e+>!roZ!V%V1X_OF$hSa=5$p51FZ)aA;|tm7z{B27U-|B2l`4|V8Vt1 zAqee&o;6KS0^JE}DUhZ|pf~$@Oqr$taT$X!h<0FLV0l-ecpVn#A`nUDH`$F_f4~A= z6fNMuet?Ay3oy{%Kmz@oP+l1<(8VAI0ZG(ANA3!^=?G~ diff --git a/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000 b/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000 deleted file mode 100644 index 4e548f68afe496ffcf56703830a0ba7bf682eab9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 200 zcmezW|NlkzngeVM5Wonbm|vjM5S{~sLLwcY>hTM~W&Z;~J%~U8j2vaziA9NdDXCn= Yr3I-)=|%aa1&nVYda#%eHw={l0QuM?M*si- diff --git a/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 b/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 deleted file mode 100644 index 95a6960e..00000000 --- a/samoa-api/build/test/data/dfs/name1/current/fsimage_0000000000000000000.md5 +++ /dev/null @@ -1 +0,0 @@ -5b946996f61c736be7bcef1bb0825e57 *fsimage_0000000000000000000 diff --git a/samoa-api/build/test/data/dfs/name1/current/seen_txid b/samoa-api/build/test/data/dfs/name1/current/seen_txid deleted file mode 100644 index d00491fd..00000000 --- a/samoa-api/build/test/data/dfs/name1/current/seen_txid +++ /dev/null @@ -1 +0,0 @@ -1 diff --git a/samoa-api/build/test/data/dfs/name2/current/VERSION b/samoa-api/build/test/data/dfs/name2/current/VERSION deleted file mode 100644 index 40ed1e62..00000000 --- a/samoa-api/build/test/data/dfs/name2/current/VERSION +++ /dev/null @@ -1,7 +0,0 @@ -#Wed Jul 15 17:02:57 PDT 2015 -namespaceID=1199357958 -clusterID=testClusterID -cTime=0 -storageType=NAME_NODE -blockpoolID=BP-511502848-10.14.2.232-1437004977072 -layoutVersion=-47 diff --git a/samoa-api/build/test/data/dfs/name2/current/edits_0000000000000000001-0000000000000000023 b/samoa-api/build/test/data/dfs/name2/current/edits_0000000000000000001-0000000000000000023 deleted file mode 100644 index 5f123dabf14053ad350d6f44ba363c47f276e270..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1595 zcmezW|Nlh^1_)qm6FU;f4COFEXa^<+9{t?Ryv&re;*iwh5(WlFzbV>R~V+@a<$Kn-+YW)MO)NZ(Meq@sj@5o(YEf@Wagfa=9+ z_!0&s7q?*NoXpg`l6XIVKVQdyAXk@AXV-XB14|2Y3qxbWctZwGLnCuN10XbH5C~ND zUV6XIBW79Hhx~-_Ei-_o7bom!Sq`)l3|K&<1JD6sn}0o!2MaMUutI3&mkR>KT_vG{ zY}ot&^a4aXLm%@?28RFt|Npzqx^yllR1Z6pX27Nw?td*T{zn)LF#^nIU|@cUJzN&B z3C)7p1r0$57BowZPy$^CYAKMWN1#`}pC4SH0CgfbcpMlQnC!m#@WKL}3nIz<>e+)% zJy@V~WAg(nY?y$7{t6Q4ddn63V1dp9wFN!UQT=a*#s3I{Ax6Lg{T22=pLO`JFU&4p z2*ZID&B4Yffo=h{6iCw}(Ca3BTwkF8aTx<2h;{%bA>GM>L9jsQhe$HNzFqL=J1o!z zu=xQNHq5|4e+>!roZ!V%V1X_OF$hSa=5$p51FZ)aA;|tm7z{B27U-|B2l`4|V8Vt1 zAqee&o;6KS0^JE}DUhZ|pf~$@Oqr$taT$X!h<0FLV0l-ecpVn#A`nUDH`$F_f4~A= z6fNMuet?Ay3oy{%Kmz@oP+l1<(8VAI0ZG(ANA3!^=?G~ diff --git a/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000 b/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000 deleted file mode 100644 index 4e548f68afe496ffcf56703830a0ba7bf682eab9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 200 zcmezW|NlkzngeVM5Wonbm|vjM5S{~sLLwcY>hTM~W&Z;~J%~U8j2vaziA9NdDXCn= Yr3I-)=|%aa1&nVYda#%eHw={l0QuM?M*si- diff --git a/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 b/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 deleted file mode 100644 index 95a6960e..00000000 --- a/samoa-api/build/test/data/dfs/name2/current/fsimage_0000000000000000000.md5 +++ /dev/null @@ -1 +0,0 @@ -5b946996f61c736be7bcef1bb0825e57 *fsimage_0000000000000000000 diff --git a/samoa-api/build/test/data/dfs/name2/current/seen_txid b/samoa-api/build/test/data/dfs/name2/current/seen_txid deleted file mode 100644 index d00491fd..00000000 --- a/samoa-api/build/test/data/dfs/name2/current/seen_txid +++ /dev/null @@ -1 +0,0 @@ -1 From f66258a733657717d82c2b6bf46619264d4ef5d1 Mon Sep 17 00:00:00 2001 From: karande Date: Fri, 31 Jul 2015 11:06:03 -0700 Subject: [PATCH 3/3] SAMOA 40: Adding code review changes --- samoa-api/pom.xml | 7 +- .../samoa/streams/kafka/KafkaReader.java | 82 ++++++++----------- .../samoa/streams/kafka/KafkaStream.java | 61 +++++++------- .../streams/kafka/KafkaToInstanceMapper.java | 57 +++++++++++++ 4 files changed, 122 insertions(+), 85 deletions(-) create mode 100644 samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 30756be2..d0219036 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -101,15 +101,10 @@ ${hadoop.version} test - - org.apache.kafka - kafka-clients - 0.8.2.1 - org.apache.kafka kafka_2.10 - 0.8.1 + ${kafka.version} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java index 8097ddd5..a6ac157d 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java @@ -28,7 +28,8 @@ import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; @@ -38,6 +39,16 @@ public class KafkaReader { protected long readOffset; private List m_replicaBrokers = new ArrayList(); + private static final int kafkaConnectionTimeOut = 100000; + private static final int kafkaBufferSize = 64 * 1024; + private static final int fetchSizeBytes = 100000; + private static final int readErrorThreshold = 5; + private static final int numLeaderCheckAttempts = 3; + private static final int delayLeaderCheckAttempts = 100; + private static final String leaderClientID = "leaderLookup"; + + private static final Logger logger = LoggerFactory + .getLogger(KafkaReader.class); public KafkaReader() { m_replicaBrokers = new ArrayList(); @@ -45,47 +56,39 @@ public KafkaReader() { } public ArrayList run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) { - - // find the meta data about the topic and partition we are interested in - String answer = ""; + String message = ""; ArrayList returnInstances = new ArrayList(); PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { - System.out.println("Can't find metadata for Topic and Partition. Exiting"); - return null; + throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting"); } if (metadata.leader() == null) { - System.out.println("Can't find Leader for Topic and Partition. Exiting"); - return null; + throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting"); } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; - SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); - //long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); - //readOffset = 0L; + SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, kafkaConnectionTimeOut, kafkaBufferSize, clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { - consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); + consumer = new SimpleConsumer(leadBroker, a_port, kafkaConnectionTimeOut, kafkaBufferSize, clientName); } - /** - * Reading data - */ + // reading data FetchRequest req = new FetchRequestBuilder() .clientId(clientName) - .addFetch(a_topic, a_partition, readOffset, 100000) + .addFetch(a_topic, a_partition, readOffset, fetchSizeBytes) .build(); FetchResponse fetchResponse = null; try { fetchResponse = consumer.fetch(req); } catch (Exception e) { - + logger.error("ERROR occoured during fetching data from Kafka"); } /** - * SimpleConsumer does not handle lead broker failures, you have to handle it + * SimpleConsumer does not handle lead broker failures, you have to handle it * once the fetch returns an error, we log the reason, close the consumer then try to figure * out who the new leader is */ @@ -93,11 +96,10 @@ public ArrayList run(long a_maxReads, String a_topic, int a_partition, L numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); - System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); - if (numErrors > 5) break; + logger.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); + if (numErrors > readErrorThreshold) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset - //readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); @@ -105,30 +107,26 @@ public ArrayList run(long a_maxReads, String a_topic, int a_partition, L try { leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); } catch (Exception e) { - // TODO Auto-generated catch block e.printStackTrace(); } continue; } - //End Error handling // Reading data cont. numErrors = 0; - - long numRead = 0; Iterator it = (Iterator) fetchResponse.messageSet(a_topic, a_partition).iterator(); MessageAndOffset messageAndOffset = null; try { messageAndOffset = (MessageAndOffset) it.next(); } catch (Exception e) { + logger.error("No more messages to read from Kafka."); return null; } - //for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { - System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); + logger.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); @@ -137,24 +135,12 @@ public ArrayList run(long a_maxReads, String a_topic, int a_partition, L byte[] bytes = new byte[payload.limit()]; payload.get(bytes); try { - System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); - answer = String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"); - returnInstances.add(answer); + message = String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"); + returnInstances.add(message); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } - - numRead++; a_maxReads--; - // break; - //} - - if (numRead == 0) { - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - } - } } if (consumer != null) consumer.close(); return returnInstances; @@ -183,7 +169,7 @@ public static long getLastOffset(SimpleConsumer consumer, String topic, int part OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { - System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); + logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); @@ -203,7 +189,7 @@ public static long getLastOffset(SimpleConsumer consumer, String topic, int part * @throws Exception */ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { - for (int i = 0; i < 3; i++) { + for (int i = 0; i < numLeaderCheckAttempts; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { @@ -213,19 +199,18 @@ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue - // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { - Thread.sleep(1000); + Thread.sleep(delayLeaderCheckAttempts); } catch (InterruptedException ie) { } } } - System.out.println("Unable to find new leader after Broker failure. Exiting"); + logger.error("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } @@ -243,7 +228,7 @@ private PartitionMetadata findLeader(List a_seedBrokers, int a_port, Str for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { - consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); //broker_host, broker_port, timeout, buffer_size, client_id + consumer = new SimpleConsumer(seed, a_port, kafkaConnectionTimeOut, kafkaBufferSize, leaderClientID); List topics = new ArrayList(); topics.add(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); @@ -261,7 +246,7 @@ private PartitionMetadata findLeader(List a_seedBrokers, int a_port, Str } } } catch (Exception e) { - System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + logger.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); @@ -276,4 +261,5 @@ private PartitionMetadata findLeader(List a_seedBrokers, int a_port, Str } return returnMetaData; } + } diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java index 610cd287..ae4dcde3 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java @@ -22,7 +22,10 @@ import com.github.javacliparser.IntOption; import com.github.javacliparser.StringOption; -import org.apache.samoa.instances.*; +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.instances.Instances; import org.apache.samoa.moa.core.Example; import org.apache.samoa.moa.core.FastVector; import org.apache.samoa.moa.core.InstanceExample; @@ -30,8 +33,6 @@ import org.apache.samoa.moa.options.AbstractOptionHandler; import org.apache.samoa.moa.streams.InstanceStream; import org.apache.samoa.moa.tasks.TaskMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.LinkedList; @@ -49,16 +50,14 @@ public class KafkaStream extends AbstractOptionHandler implements private KafkaReader reader; - protected InstanceExample lastInstanceRead; + private KafkaToInstanceMapper mapper; - protected Queue instanceQueue; + protected InstanceExample lastInstanceRead; - private static final Logger logger = LoggerFactory - .getLogger(KafkaStream.class); + List seeds = new ArrayList(); - public void KafkaReader() { - reader = new KafkaReader(); - } + // This is used to buffer messages read from kafka. It helps reducing number of queries to kafka + protected Queue instanceQueue; public IntOption classIndexOption = new IntOption("classIndex", 'c', "Class index of data. 0 for none or -1 for last attribute in file.", @@ -66,8 +65,7 @@ public void KafkaReader() { public IntOption numAttrOption = new IntOption("numNumerics", 'u', "The number of numeric attributes in" + - " dataset", 300, 0, - Integer.MAX_VALUE); + " dataset", 300, 0, Integer.MAX_VALUE); public StringOption topicOption = new StringOption("topic", 't', "Topic in the kafka to be used for reading data", "test"); @@ -91,15 +89,30 @@ public void KafkaReader() { public IntOption timeDelayOption = new IntOption("timeDelay", 'y', "Time delay in milliseconds between two read from kafka", 0, 0, Integer.MAX_VALUE); + public IntOption instanceType = new IntOption("instanceType", 'i', + "Type of instance to be used. DenseInstance(0)/SparaseInstance(1)", 0); + + public StringOption keyValueSeparator = new StringOption("keyValueSeparator", 'a', + "Separator between key and value for string read from kafka", ":"); + + public StringOption valuesSeparator = new StringOption("valuesSeparator", 'b', + "Separator between values for string read from kafka", ","); + + public void KafkaReader() { + reader = new KafkaReader(); + } + @Override protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { this.reader = new KafkaReader(); + this.mapper = new KafkaToInstanceMapper(); generateHeader(); instanceQueue = new LinkedList(); - this.lastInstanceRead = null; + seeds.add(this.seedOption.getValue()); } + protected void generateHeader() { FastVector attributes = new FastVector<>(); @@ -120,7 +133,6 @@ protected void generateHeader() { } else if (this.classIndexOption.getValue() > 0) { this.streamHeader.setClassIndex(this.classIndexOption.getValue() - 1); } - } @Override @@ -138,14 +150,11 @@ private String getNextInstanceFromKafka() { return instanceQueue.remove(); } - List seeds = new ArrayList(); - seeds.add(this.seedOption.getValue()); ArrayList kafkaData; - do { kafkaData = this.reader.run(this.numMaxreadOption.getValue(), this.topicOption.getValue(), this.partitionOption.getValue(), - seeds, this.portOption.getValue()); + this.seeds, this.portOption.getValue()); } while (kafkaData == null); instanceQueue.addAll(kafkaData); @@ -155,20 +164,10 @@ private String getNextInstanceFromKafka() { @Override public Example nextInstance() { InstancesHeader header = getHeader(); - Instance inst = new DenseInstance(header.numAttributes()); - + Instance inst; String kafkaString = getNextInstanceFromKafka(); - String[] KeyValueString = kafkaString.split(":"); - String[] attributes = KeyValueString[1].split(","); - - for (int i = 0; i < attributes.length - 1; i++) { - if (i < numAttrOption.getValue()) { - inst.setValue(i, Double.parseDouble(attributes[i])); - } - } - inst.setDataset(header); - inst.setClassValue(Double - .parseDouble(attributes[attributes.length - 1])); + inst = mapper.getInstance(kafkaString, keyValueSeparator.getValue(), valuesSeparator.getValue(), + instanceType.getValue(), numAttrOption.getValue(), header); try { Thread.sleep(timeDelayOption.getValue()); diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java new file mode 100644 index 00000000..23b57491 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java @@ -0,0 +1,57 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.instances.SparseInstance; + +public class KafkaToInstanceMapper { + + public Instance getInstance(String message, String keyValueSeparator, String valuesSeparator, int instanceType, int numAttr, InstancesHeader header) { + Instance inst; + if (!message.isEmpty()) { + String[] KeyValueString = message.split(keyValueSeparator); + String[] attributes = KeyValueString[1].split(valuesSeparator); + inst = createInstance(header, instanceType); + for (int i = 0; i < attributes.length - 1; i++) { + if (i < numAttr) { + inst.setValue(i, Double.parseDouble(attributes[i])); + } + } + inst.setDataset(header); + inst.setClassValue(Double + .parseDouble(attributes[attributes.length - 1])); + return inst; + } + throw new IllegalArgumentException("Empty string value from Kafka"); + } + + public Instance createInstance(InstancesHeader header, int instanceType) { + Instance inst; + if (instanceType == 0) + inst = new DenseInstance(header.numAttributes()); + else + inst = new SparseInstance(header.numAttributes()); + return inst; + } +}