Public | Automated Build

Last pushed: 6 months ago
Short Description
SparkStreamingFromKafka
Full Description
docker run                            \
  --rm                                \
  --name=sparkstreamingfromkafka      \
  --net=host                          \
  -v ${PWD}:/work                     \
  -it dserban/sparkstreamingfromkafka bash
docker exec -it sparkstreamingfromkafka bash
cd /opt/kafka
bin/kafka-console-consumer.sh \
  --zookeeper localhost:2181  \
  --topic analytics           \
  --from-beginning
initialCommands in console := "import scalaz._, Scalaz._"

libraryDependencies += "com.github.benfradet" %% "spark-kafka-0-10-writer"   % "0.2.0"
libraryDependencies += "org.json4s"           %% "json4s-native"             % "3.2.11"
libraryDependencies += "org.scalaz"           %% "scalaz-core"               % "7.2.8"
libraryDependencies += "com.datastax.spark"   %% "spark-cassandra-connector" % "2.0.0-RC1"
libraryDependencies += "com.holdenkarau"      %% "spark-testing-base"        % "2.1.0_0.6.0"
libraryDependencies += "com.typesafe.akka"    %% "akka-http-core"            % "10.0.3"
libraryDependencies += "joda-time"            %  "joda-time"                 % "2.9.7"

https://github.com/BenFradet/spark-kafka-writer

import org.joda.time.DateTime
val timeRightNow = new DateTime
timeRightNow.hourOfDay.roundFloorCopy.getMillis
// Long = 1484593200000
import java.util.Locale
import org.joda.time.format.DateTimeFormat

val pattern = "MMM d HH:mm:ss Z yyyy"
val input = "Apr 10 18:31:45 +0000 2015"

val format = DateTimeFormat.forPattern(pattern).withLocale(Locale.US)
format.parseDateTime(input)
import java.text.SimpleDateFormat

val input = "2013-06-13 01:41:19"
new SimpleDateFormat(f).parse(input)
// java.util.Date = Thu Jun 13 01:41:19 EEST 2013
import org.json4s.native.JsonMethods.parse

val parsedResult = parse("""{"some_key":"1234"}""")
(parsedResult \ "some_key").values.toString.toInt

val raw_json = """[ { "count": 2
                    , "items": [ {"transaction_component_type_id": "4","amount_in_eur": "105.00"}
                               , {"transaction_component_type_id": "3","amount_in_eur": "17.00"} ] }
                  , { "count": 2
                    , "items": [ {"transaction_component_type_id": "2","amount_in_eur": "82.00"}
                               , {"transaction_component_type_id": "3","amount_in_eur": "238.00"} ] } ]"""

val parsedResult = parse(raw_json)

parsedResult.
values.
asInstanceOf[List[Map[String,Any]]].
map(_("items")).
asInstanceOf[List[List[Map[String,String]]]].
flatten.
map { m => ( m("transaction_component_type_id").toInt, m("amount_in_eur").toDouble ) }
// res15: List[(Int, Double)] = List((4,105.0), (3,17.0), (2,82.0), (3,238.0))
Docker Pull Command
Owner
dserban
Source Repository

Comments (0)