添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
八块腹肌的风衣  ·  Error starting ZK ...·  2 周前    · 
玉树临风的苦瓜  ·  Bigwhite's Blog·  2 周前    · 
威武的帽子  ·  Kafka 中文文档 - ApacheCN·  2 周前    · 
买醉的花卷  ·  CoreOS VS ...·  1 月前    · 
跑龙套的手链  ·  FingerPrint Fuzzy ...·  4 月前    · 
光明磊落的烤面包  ·  Essential Oils Pure ...·  7 月前    · 

Kafka => SparkStreaming => Elastic

In the last netcat => spark streaming => elastic tutorial, you would have seen the data flow from Netcat unix streams to Elastic Search through Spark Streaming.

In this tutorial, I am going to walk you through some basics of Apache Kafka technology and how to make the data movement from/out Kafka. Then we will see how Spark Streaming can consume the data produced by Kafka. Finally we will see how Spark Streaming Process can feed this data to Elastic Search Lively.

What is Kafka:

Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed at LinkedIn Corporation and later on became a part of Apache project. Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.

This url helped me to understand the Kafka better.

Procedure to Install Kafka in Mac OS:

If you are mac/ubuntu/linux/redhat user, you can easily do it using brew install kafka once you have homebrew installed in your machine. For other OS, please review the kafka.apache.org website.

Once it’s installed do a cd /usr/local/Cellar/kafka/0.9.0.1/bin directory and start kafka services as below.

Similarly do a cd /usr/local/Cellar/elasticsearch-2.3.3/bin/ directory and start elasticsearch services as below.

#! /bin/bash
#Provide Kafka, Elastic Installed Location
export kafkaLocation=/usr/local/Cellar/kafka/0.9.0.1/bin
export elasticLocation=/usr/local/Cellar/elasticsearch-2.3.3/bin
export kafkatopicName=wordcounttopic
#Start All Kafka Services
$kafkaLocation/zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties &
$kafkaLocation/kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties &
$elasticLocation/elasticsearch &
echo 'Kafka Elasticsearch Services started'

Start the Zookeeper Services:

./zookeeper-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/zookeeper.properties

Start the Kafka Server as below:

./kafka-server-start /usr/local/Cellar/kafka/0.9.0.1/libexec/config/server.properties

Start the Kafka Producer Topic:

$kafkaLocation/kafka-console-producer --broker-list localhost:9092 --topic $kafkatopicName

I have typed the below words in the console.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Now, we will see how Spark consumes through in built kafka consumer and sends to Elastic Search.

Spark Program : Write this in IntelliJ:

I have referred the code from databricks documentation and tweaked a bit according to the Elastic Search Indexing.

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters

Once you click Run, you will see the output (wordcount.show()) in the Driver Console as below.

You will see the standard output as below. This continues to display until it receives the data from Kafka.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/16 21:30:43 INFO SparkContext: Running Spark version 1.6.1
16/06/16 21:30:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/16 21:30:45 INFO SecurityManager: Changing view acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: Changing modify acls to: vgiridatabricks
16/06/16 21:30:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vgiridatabricks); users with modify permissions: Set(vgiridatabricks)
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriver' on port 49990.
16/06/16 21:30:45 INFO Slf4jLogger: Slf4jLogger started
16/06/16 21:30:45 INFO Remoting: Starting remoting
16/06/16 21:30:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49991]
16/06/16 21:30:45 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 49991.
16/06/16 21:30:45 INFO SparkEnv: Registering MapOutputTracker
16/06/16 21:30:45 INFO SparkEnv: Registering BlockManagerMaster
16/06/16 21:30:45 INFO DiskBlockManager: Created local directory at /private/var/folders/4q/fjc7vkmn4wn_m4fynrvxk2hc0000gn/T/blockmgr-568204eb-276e-4be7-8b0d-5d451f439eef
16/06/16 21:30:45 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/06/16 21:30:45 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/16 21:30:46 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/06/16 21:30:46 INFO SparkUI: Started SparkUI at http://10.10.10.111:4040
16/06/16 21:30:46 INFO Executor: Starting executor ID driver on host localhost
16/06/16 21:30:46 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49992.
16/06/16 21:30:46 INFO NettyBlockTransferService: Server created on 49992
16/06/16 21:30:46 INFO BlockManagerMaster: Trying to register BlockManager
16/06/16 21:30:46 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49992 with 2.4 GB RAM, BlockManagerId(driver, localhost, 49992)
16/06/16 21:30:46 INFO BlockManagerMaster: Registered BlockManager
16/06/16 21:30:46 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/06/16 21:30:46 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:46 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:46 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
Creating function called to create new StreamingContext
16/06/16 21:30:47 INFO WriteAheadLogManager  for Thread: Recovered 2 write ahead log files from file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/receivedBlockMetadata
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval automatically set to 10000 ms
16/06/16 21:30:47 INFO ForEachDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO StateDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO MappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO FlatMappedDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO ForEachDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO StateDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO MappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO FlatMappedDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@7a45d714
16/06/16 21:30:47 INFO FlatMappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO FlatMappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO FlatMappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1dd74143
16/06/16 21:30:47 INFO MappedDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO MappedDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO MappedDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@66236a0a
16/06/16 21:30:47 INFO StateDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO StateDStream: Storage level = StorageLevel(false, true, false, false, 1)
16/06/16 21:30:47 INFO StateDStream: Checkpoint interval = 10000 ms
16/06/16 21:30:47 INFO StateDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO StateDStream: Initialized and validated org.apache.spark.streaming.dstream.StateDStream@5634a861
16/06/16 21:30:47 INFO ForEachDStream: Slide time = 2000 ms
16/06/16 21:30:47 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/16 21:30:47 INFO ForEachDStream: Checkpoint interval = null
16/06/16 21:30:47 INFO ForEachDStream: Remember duration = 60000 ms
16/06/16 21:30:47 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@5626d18c
16/06/16 21:30:47 INFO RecurringTimer: Started timer for JobGenerator at time 1466137848000
16/06/16 21:30:47 INFO JobGenerator: Started JobGenerator at 1466137848000 ms
16/06/16 21:30:47 INFO JobScheduler: Started JobScheduler
16/06/16 21:30:47 INFO StreamingContext: StreamingContext started
16/06/16 21:30:48 INFO StateDStream: Time 1466137846000 ms is invalid as zeroTime is 1466137846000 ms and slideDuration is 2000 ms and difference is 0 ms
16/06/16 21:30:48 INFO VerifiableProperties: Verifying properties
16/06/16 21:30:48 INFO VerifiableProperties: Property group.id is overridden to 
16/06/16 21:30:48 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
16/06/16 21:30:48 INFO JobScheduler: Added jobs for time 1466137848000 ms
16/06/16 21:30:48 INFO JobGenerator: Checkpointing graph for time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updating checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO JobScheduler: Starting job streaming job 1466137848000 ms.0 from job set of time 1466137848000 ms
16/06/16 21:30:48 INFO DStreamGraph: Updated checkpoint data for time 1466137848000 ms
16/06/16 21:30:48 INFO CheckpointWriter: Submitted checkpoint of time 1466137848000 ms writer queue
16/06/16 21:30:48 INFO CheckpointWriter: Saving checkpoint for time 1466137848000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000'
16/06/16 21:30:48 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000.bk
16/06/16 21:30:48 INFO CheckpointWriter: Checkpoint for time 1466137848000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137848000', took 4209 bytes and 37 ms
16/06/16 21:30:49 INFO SparkContext: Starting job: show at KafkaSSCtoES.scala:71
16/06/16 21:30:49 INFO DAGScheduler: Registering RDD 2 (map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO DAGScheduler: Got job 0 (show at KafkaSSCtoES.scala:71) with 1 output partitions
16/06/16 21:30:49 INFO DAGScheduler: Final stage: ResultStage 1 (show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/06/16 21:30:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.8 KB, free 4.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.5 KB, free 7.3 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:49992 (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at KafkaSSCtoES.scala:63)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 0
16/06/16 21:30:49 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 5 is the same as ending offset skipping wordcounttopic 1
16/06/16 21:30:49 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 90 ms on localhost (1/3)
16/06/16 21:30:49 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, partition 2,ANY, 2009 bytes)
16/06/16 21:30:49 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/06/16 21:30:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 11 ms on localhost (2/3)
16/06/16 21:30:49 INFO KafkaRDD: Beginning offset 4 is the same as ending offset skipping wordcounttopic 2
16/06/16 21:30:49 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1158 bytes result sent to driver
16/06/16 21:30:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 9 ms on localhost (3/3)
16/06/16 21:30:49 INFO DAGScheduler: ShuffleMapStage 0 (map at KafkaSSCtoES.scala:63) finished in 0.114 s
16/06/16 21:30:49 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/06/16 21:30:49 INFO DAGScheduler: looking for newly runnable stages
16/06/16 21:30:49 INFO DAGScheduler: running: Set()
16/06/16 21:30:49 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/06/16 21:30:49 INFO DAGScheduler: failed: Set()
16/06/16 21:30:49 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71), which has no missing parents
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.4 KB, free 16.8 KB)
16/06/16 21:30:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.7 KB, free 21.4 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49992 (size: 4.7 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/16 21:30:49 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at show at KafkaSSCtoES.scala:71)
16/06/16 21:30:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/06/16 21:30:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 1894 bytes)
16/06/16 21:30:49 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
16/06/16 21:30:49 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/06/16 21:30:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:49992 in memory (size: 2.5 KB, free: 2.4 GB)
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
16/06/16 21:30:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
16/06/16 21:30:49 INFO MemoryStore: Block rdd_4_0 stored as bytes in memory (estimated size 4.0 B, free 14.1 KB)
16/06/16 21:30:49 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:49992 (size: 4.0 B, free: 2.4 GB)
16/06/16 21:30:50 INFO GenerateUnsafeProjection: Code generated in 175.318511 ms
16/06/16 21:30:50 INFO JobScheduler: Added jobs for time 1466137850000 ms
16/06/16 21:30:50 INFO JobGenerator: Checkpointing graph for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updating checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO DStreamGraph: Updated checkpoint data for time 1466137850000 ms
16/06/16 21:30:50 INFO CheckpointWriter: Submitted checkpoint of time 1466137850000 ms writer queue
16/06/16 21:30:50 INFO CheckpointWriter: Saving checkpoint for time 1466137850000 ms to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000'
16/06/16 21:30:50 INFO GenerateSafeProjection: Code generated in 12.79301 ms
16/06/16 21:30:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1915 bytes result sent to driver
16/06/16 21:30:50 INFO CheckpointWriter: Deleting file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1465878822000
16/06/16 21:30:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 493 ms on localhost (1/1)
16/06/16 21:30:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/06/16 21:30:50 INFO CheckpointWriter: Checkpoint for time 1466137850000 ms saved to file 'file:/usr/local/Cellar/kafka/0.9.0.1/checkpoint/checkpoint-1466137850000', took 4259 bytes and 27 ms
16/06/16 21:30:50 INFO DAGScheduler: ResultStage 1 (show at KafkaSSCtoES.scala:71) finished in 0.496 s
16/06/16 21:30:50 INFO DAGScheduler: Job 0 finished: show at KafkaSSCtoES.scala:71, took 0.865830 s
+----+-----+
|word|count|
+----+-----+
+----+-----+

Now I  hit the Enter in the Kafka Producer Console and checking the Driver Output in IntelliJ.

Now see the Output in Elastic Search head plugin :

Start the elasticsearch service as below.

$ cd /usr/local/Cellar/elastic*/bin

./elasticsearch

As per the Above program we are creating the Index Name and Type as kafkawordcount/kwc. Below snapshot shows you the head plugin before the execution of Spark Steaming job. You can see no index name called kafkawordcount

See the out put in elastic search head plugin after First Hit in the Kafka Producer Console of the below words.

Hello Kafka from Spark Streaming Learning Kafka is fun. and learning kafka with spark streaming is more fun and even when the destination is elasticsearch it will be more fun

Screen Shot 2016-06-16 at 9.44.02 PM See the full output:

Hope you enjoyed this tutorial.

Hey. Thanks for the tutorial. I tried to publish protobuf encoded stream to a kafka broker but it seems that the spark instance crashes if I am subscribed to that topic even if I’m not doing anything with that data. Can you (or someone from the community) help me with figuring out how spark will “not” crash if I throw out bytestreams (like protobufs). It seems non-ascii data is what causes the crashes.
I am looking (albeit rather unsuccessfully) into streaming protocol buffers over Kafka for IoT data.

Thanks.

Like

That’s correct. Spark crashes for any non ascii present in your data streams. You might want to use utf-8 conversion or ignore before sending to worker nodes. I think one of my other blog explains this. Please search for it.

Like

Hey, thanks for the reply. I solved the issue by overriding the default utf8 conversion to string in pyspark. Do you know why utf8 is the default? It has caused me quite a few headaches. IMO, a bytestream (or equivalent) should be default.

Liked by 1 person

Hello Giri,
I am not able to resolve import error

import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

object kafka is not a member of package org.apache.spark.streaming
not found: object kafka

Getting this two error in ScalaIDE and while compile in SBT also – i have included the dependencies, not sure what I am missing, can you please help me.

Providing some details below, if you need additional information, please let me know.

My Build.sbt

libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “2.0.0” % “provided”,
“org.apache.spark” %% “spark-streaming” % “2.0.0” % “provided”,
“org.apache.spark” %% “spark-streaming-kafka-0-8” % “2.0.0”,
“org.apache.kafka” % “kafka_2.11” % “0.9.0.1”

Error while compile using sbt:

[error] C:\Parthi\Knowledge\kafkaspark\src\main\scala\KafkaExample.scala:36: value createDirectStream is not a member of object org.apache.spark.streaming.kafka.KafkaUtils
[error] val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 2 s, completed Feb 11, 2017 2:41:06 PM

Like