본문 바로가기

Programing/Java

Java pipeline framework (자바 파이프라이닝 처리)


자파 파이프라인 프레임워크 

내가 말하려는 파이프라인은 JAVA pipeline framework 모델이다. 언어는 JAVA가 아니어도 된다. 



이미지 검색을 해봤는데 마음에 드는 그림이 없어서 직접 그렸다. 

그러니까 공장에서 자동차를 조립하는 것을 상상해보면된다. 
저런 pipeline이 있고 그 line에 따라서 job(자동)가 오면 작업자1(process1), 작업자2(process2), 작업자3(process3)이 차례로 job에 대해 어떤 일을 진행한다. 

이런 종류의 프로세스는 같은 작업에 대해서 병렬적인 프로세스를 위해 쓰인다. 
저런 line이 10개면 동시에 10개의 파이프라인에서 10대의 자동차를 생산할 수 있기 때문이다. 


http://www.informit.com/articles/article.aspx?p=366887&seqNum=8 
위 주소로 가서  Java pipeline framework를 Ctrl+f로 검색해보면 java pipeline framework에 대해서 볼 수 있다. 



코드도 볼 수 있는데 PipelineStage라는 것이 위에서 말한 process단위를 말한다.  
아래는 BlockingQueue in에서 job을 꺼내서 run()에 있는 firstStep, step, lastStep을 거친다.
while문에 보면 done(완료)라는 신호를 받기 전까지 계속 진행됨을 볼 수 있다.  
즉, 자동차가 들어오면 계속한다는 것이다. 밤이 되서 공장 전기가 꺼질때 까지 ...  



Example 4.27. Base class for pipeline stages

import java.util.concurrent.*;

abstract class PipelineStage implements Runnable {

    BlockingQueue in;
    BlockingQueue out;
    CountDownLatch s;

    boolean done;

    //override to specify initialization step
   abstract void firstStep() throws Exception;
   //override to specify compute step
   abstract void step() throws Exception;
   //override to specify finalization step
   abstract void lastStep() throws Exception;
   
   
   void handleComputeException(Exception e)
   { e.printStackTrace(); }
   
   
   public void run()
   {
   try
   { firstStep();
   while(!done){ step();}
   lastStep();
   }
   catch(Exception e){handleComputeException(e);}
   finally {s.countDown();}
   }
   
   public void init(BlockingQueue in,
   BlockingQueue out,
   CountDownLatch s)
   { this.in = in; this.out = out; this.s = s;}
   
   }
   

보면 stage가 Runnable로 되어 있는 것을 볼 수 있다. 자동차 조립라인 작업자들은 thread같이 독자적으로 일하니깐 ^^ 


그럼 작업라인 즉, 전체 조립라인을 보자 

전체 파이프 라인에서는 제품이 흘러 들어오고 작업자들이 작업을 한다. 

작업자 (stage)에 흘러오는 job은 stage(작업자)의 갯수 만큼 process가 진행된 후에 끝난다. 


Example 4.28. Base class for linear pipeline

import java.util.concurrent.*;

abstract class LinearPipeline {
   PipelineStage[] stages;
   BlockingQueue[] queues;
   int numStages;
   CountDownLatch s;

   //override method to create desired array of pipeline stage objects
   abstract PipelineStage[] getPipelineStages(String[] args);
   
   //override method to create desired array of BlockingQueues
   //element i of returned array contains queue between stages i and i+1
   abstract BlockingQueue[] getQueues(String[] args);
   
   LinearPipeline(String[] args)
   { stages = getPipelineStages(args);
   queues = getQueues(args);
   numStages = stages.length;
   s = new CountDownLatch(numStages);
   
   BlockingQueue in = null;
   BlockingQueue out = queues[0];
   for (int i = 0; i != numStages; i++)
   { stages[i].init(in,out,s);
   in = out;
   if (i < numStages-2) out = queues[i+1]; else out = null;
   }
   }
   
   public void start()
   { for (int i = 0; i != numStages; i++)
   { new Thread(stages[i]).start();
   }
   }
   }



이 LinearPipeline을 thread로 돌리면 자동차를 만드는 line을 병렬적으로 돌릴 수가 있다. IO를 많이 사용하는 작업등에서 사용될 수 있는 구조다.