spark streaming 테스르를 진행할 때 보통은 카프카나 파일을 읽는 것부터 시작하는데.

이번에는 메모리가 있나 해서 보니. 메모리가 있다. 

 

producer 코드는 단순히 쓰레드로 만드는 코드를 추가하고

이를 spark streaming 에서 쉽게 처리하는 간단한 예제가 있다. 

 

package streaming

import org.apache.spark.streaming.StreamingContext
import streaming.KafkaAvroDemo.INTERVAL
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.joda.time.DateTime

object MemoryStreamDemo {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .config("spark.master", "local[2]")
      .getOrCreate()


    val SLEEP_TIME = 3000L

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, INTERVAL)

    val inputStream = new MemoryStream[Int](1, spark.sqlContext)

    new Thread(new Runnable {
      override def run() = {
        while (true) {
          inputStream.addData(1, 2, 3, 4, 5)
          Thread.sleep(SLEEP_TIME)
        }
      }
    }).start()

    val stream = inputStream.toDS().toDF("number")

    val query = stream.writeStream.trigger(Trigger.ProcessingTime(SLEEP_TIME))
      .foreachBatch((dataset, batchId) => {
        dataset.foreachPartition(rows => {
          rows.foreach(row => {
            println(new DateTime())
            println(s"batch : ${batchId} , value : ${row}")
          })
        })
      })
      .start()

    query.awaitTermination(20000L)
  }
}

 

결과는 다음과 같다.

 

 

batch : 0 , value : [1]
batch : 0 , value : [2]
batch : 0 , value : [3]
batch : 0 , value : [4]
batch : 0 , value : [5]

 

batch : 0 , value : [1]
batch : 0 , value : [2]
batch : 0 , value : [3]
batch : 0 , value : [4]
batch : 0 , value : [5]

 

....

 

Posted by 김용환 '김용환'

댓글을 달아 주세요

git 저장소가 변경되어서 이미 git clone 디렉토리에서 remote url을 변경하려면 다음 커맨드를 사용한다

git remote set-url origin new_url

Posted by 김용환 '김용환'

댓글을 달아 주세요

Warning: Using a password on the command line interface can be insecure 

에러를 해결하려면

 

https://knight76.tistory.com/entry/mariadbmysqldbWarning-Using-a-password-on-the-command-line-interface-can-be-insecure 처럼 진행해야 한다.

 

[mariadb/mysqldb]Warning: Using a password on the command line interface can be insecure

shell 에서 mysql db접속시 패스워드를 입력하면 mysql 5.6 부터는 Warning 문구가 발생한다. $ mysql #{database} -u#{username} -p#{password} Warning: Using a password on the command line interface can be..

knight76.tistory.com

그러나, 테스트 환경 (CI/CD) 구축때는 조금 꼭 이렇게 하는게 부담스러울 수 있는데..-p패스워드 처럼 편한게 어디 있나.. 

 

그래서 에러 출력만 안나오게 하는 방법이 있다. 2> /dev/null를 이용하는 방법이 좋을 수 있다.

 

 

docker-compose exec -T mysql mysql -uroot -p패스워드 -e "set global general_log=on" 2> /dev/null

 

그러나 확인할 때는 > /dev/null 2>1 를 사용할 수 도 있다. 

 

 is_success=$(docker-compose exec -T mysql mysql -uroot -p패스워드 -e "show databases" > /dev/null 2>1 && echo "true")

 

Posted by 김용환 '김용환'

댓글을 달아 주세요

rabbitmq 버전을 확인하려면 status 커맨드로 확인할 수 있다. 밑에 보이는 버전 명을 참고 한다.

 

$ sudo rabbitmqctl status
Status of node 'rabbit@google-alpha-rabbitmq001'
[{pid,21005},
 {running_applications,
     [{rabbitmq_event_exchange,"Event Exchange Type","3.6.14"},
      {rabbitmq_top,"RabbitMQ Top","3.6.14"},

....



 {rabbit,"RabbitMQ","3.6.14"},

...
Posted by 김용환 '김용환'

댓글을 달아 주세요

https://speakerdeck.com/ewolff/monolith-to-microservices-a-comparison-of-strategies

 

Monolith to Microservices: A Comparison of Strategies

This presentation shows several different strategies to migrate a monolith to a set of microservices.

speakerdeck.com

 

 

Posted by 김용환 '김용환'

댓글을 달아 주세요

https://github.com/google/python-fire

 

import fire

class Calculator(object):
 “”"A simple calculator class.“”"

 def double(self, number):
   return 2 * number

 def hello(self, name):
     return “hello ” + name + “!”

if __name__ == ‘__main__‘:
   fire.Fire(Calculator)


$ python calculator.py double --number=15 

30

 

$ python calculator.py double 10 

20

 

$ python test.py hello --name=“sam”
hello sam!

 

 

Posted by 김용환 '김용환'

댓글을 달아 주세요

부하 줄 때 편한 툴입니다.
https://github.com/rakyll/hey

$ go get -u github.com/rakyll/hey

$  hey -c 1 -n 200000 -q 5 -m GET http://localhost:8080/insert
초당 5개, 병렬 1, 최대 요청 200000

Posted by 김용환 '김용환'

댓글을 달아 주세요

docker compose에서 특정 컨테이너를 재시작해서 테스트하고 싶을 때가 있다.

Failover와 같은 문제를 테스트하려면 특정 시간 멈춤 뒤에 재현할 때 유용하다.

 

 

docker-compose restart mysql

 

docker-compose restart -t 30 mysql

Posted by 김용환 '김용환'

댓글을 달아 주세요

ubuntu 14:04 도커를 사용 중에 deb.debian.org 연결에 실패한다.

 

# apt-get update
Ign http://deb.debian.org jessie InRelease
Hit http://security.debian.org jessie/updates InRelease
Get:1 http://deb.debian.org jessie-updates InRelease [7340 B]
Hit http://deb.debian.org jessie Release.gpg
Hit http://deb.debian.org jessie Release
Get:2 http://security.debian.org jessie/updates/main amd64 Packages [825 kB]
Get:3 http://deb.debian.org jessie/main amd64 Packages [9098 kB]
Fetched 9930 kB in 14s (666 kB/s)
W: Failed to fetch http://deb.debian.org/debian/dists/jessie-updates/InRelease  Unable to find expected entry 'main/binary-amd64/Packages' in Release file (Wrong sources.list entry or malformed file)

E: Some index files failed to download. They have been ignored, or old ones used instead.

 


이 부분으로 되어 있을 텐데..

 

# cat /etc/apt/sources.list
deb http://deb.debian.org/debian jessie main
deb http://deb.debian.org/debian jessie-updates main
deb http://security.debian.org jessie/updates main


도메인을 변경한다. 

 

 

# vi /etc/apt/sources.list
deb http://archive.debian.org/debian/ jessie main
deb-src http://archive.debian.org/debian/ jessie main
deb http://security.debian.org jessie/updates main
deb-src http://security.debian.org jessie/updates main

 

 


나온지 오래되어서 도메인이 옮겨졌다..


 

 

from ubuntu 14를 사용하는 Dockerfile에는 아래와 같이 한 줄로 작성한다.

 

RUN printf "deb http://archive.debian.org/debian/ jessie main\ndeb-src http://archive.debian.org/debian/ jessie main\ndeb http://security.debian.org jessie/updates main\ndeb-src http://security.debian.org jessie/updates main" > /etc/apt/sources.list
Posted by 김용환 '김용환'

댓글을 달아 주세요

spark 2.4부터 kubernetes에 연동할 수 있다.

스트리밍 애플리케이션을 개발해서 배포한 내용은 다음과 같다.



kubernetes에 spark streaming job을 실행하려면 다음과 같은 형태로 submit을 해야 한다.
$ bin/spark-submit \
    --master k8s://https://: \
    --deploy-mode cluster \
    --name spark-streaming-job \
    --class main.MainClass \
    --conf spark.executor.instances=1 \
    --conf spark.kubernetes.container.image= \
    local:///path/to/examples.jar


먼저 spark이 깔린 환경으로 dockernize한다. (-m을 사용하면 minikube라서 쓰지 않는다)

$ ./bin/docker-image-tool.sh -t spark-docker build
Sending build context to Docker daemon    259MB
Step 1/15 : FROM openjdk:8-alpine
8-alpine: Pulling from library/openjdk
8e402f1a9c57: Pull complete
4866c822999c: Pull complete
ec484ea07ed1: Pull complete
Digest: sha256:066ad5ab75cfdfbeaff8481f988b4e35a04fef5d24309da2bdd5af59b983b68f
Status: Downloaded newer image for openjdk:8-alpine


실행이 완료되면 다음과 같은 docker image를 생성된 것을 볼 수 있다. 

$ docker images
REPOSITORY                         TAG                 IMAGE ID            CREATED             SIZE
spark-r                            spark-docker        47533c66d1d8        13 hours ago        740MB
spark-py                           spark-docker        b6fec2b48ea6        13 hours ago        446MB
spark                              spark-docker        b91978355818        13 hours ago        355MB



도커 허브에 로그인한다.
$ docker login -u samuel.c 사설_저장소


사용한 spark 2.4.0 환경 도커로 태깅한다.
$ docker tag spark:spark-docker 사설_저장소/samuel_c/spark_docker

spark 2.4.0 환경 도커를 도커 허브에 푸시한다.
$ docker push 사설_저장소/samuel_c/spark_docker
 

spark streaming job 애플리케이션 디렉토리에서 jar를 얻는다.
$ sbt assembly

[info] Assembly up to date: /dev/spark-streaming-job/build/kafka-spark-streaming.jar


장비를 http 서버로 보낸다.
$ scp  -o GSSAPIAuthentication=yes /dev/commerce/spark-demos/build/kafka-spark-streaming.jar ftp장비_디렉토리


잡을 k8s에 submit 한다.
$ bin/spark-submit     \
     --master k8s://https://master_주소:6443    \ 
     --deploy-mode cluster    \
     --name spark-streaming   \  
     --class streaming.KafkaAvroDemo       \
     --conf spark.kubernetes.container.image=사설_저장소/samuel_c/spark_docker  \
     http://ftp장비/kafka-spark-streaming.jar


제대로 실행 중인지 확인한다. Error와 Running을 확인할 수 있다. 
$ kubectl get pods 

spark-streaming-1555032194366-driver   0/1     Error              0          4h
spark-streaming-1555047069996-driver   1/1     Running            0          35m




http 대신 docker에 기본 spark 도커를 기반으로 할 수 있다.


Dockerfile
FROM 사설_저장소/samuel_c/spark_docker

MAINTAINER datalake

ENV http_proxy 프록시
ENV https_proxy 프록시
ENV APP_HOME /app
RUN mkdir -p $APP_HOME
WORKDIR $APP_HOME

# Upload & build source
COPY . $APP_HOME
RUN ./sbt assembly



테스트를 이렇게 할 순 있지만..
docker run -i -t 사설_저장소/samuel_c/spark-demos  /bin/bash


http로 jar를 다운받는 게 가장 빠른 것 같다.

'scala' 카테고리의 다른 글

[sbt] 1.3.0  (0) 2019.09.06
scala cats 공부 자료.  (0) 2019.06.18
[spark] kubernetes(k8s) 배포하기  (0) 2019.04.12
[sbt] spark 앱에서 Failed to find data source: kafka 해결하기  (0) 2019.04.12
sbt 병렬 다운로드  (0) 2019.04.08
sbt assembly 에러  (0) 2019.04.08
Posted by 김용환 '김용환'

댓글을 달아 주세요