라즈베리파이에 구글 음성인식 사용하기 (최신 샘플코드 + 한국어)
라즈베리파이, 오렌지파이

라즈베리파이에 구글 음성인식 사용하기 (최신 샘플코드 + 한국어)

반응형

 

 

 

구글에서 제공하는 구글 음성인식(이하 Google Speech)을 사용해보기에 앞서 아래 2개의 글을 먼저 읽고 시작하는 것을 추천한다. 시작하기 위한 필수 준비과정들을 알려준다.

 

2018/08/17 - [라즈베리파이/기초] - 라즈베리파이에서 USB 마이크, 스피커 설정하기

2018/08/18 - [라즈베리파이/기초] - 라즈베리파이에 구글 어시스턴트 설치하기 (model ID 포함)

 

위의 과정이 완료되었다면 (구글 어시스턴트를 먼저 설치해보기를 권장한다.) 먼저 라즈베리파이에 pyaudio를 설치한다.

$ sudo apt-get update
$ sudo apt-get install libportaudio0 libportaudio2 libportaudiocpp0 portaudio19-dev
$ sudo apt-get install python-dev
 
방법 1.
$ sudo pip install pyaudio
 
방법 2.
$ sudo apt-get install git
$ sudo git clone http://people.csail.mit.edu/hubert/git/pyaudio.git
$ cd pyaudio
$ sudo python setup.py install

 

혹시라도 방법 1이 되지 않는다면 방법 2를 통해 설치하면 된다. 이제 https://github.com/GoogleCloudPlatform/python-docs-samples에서 샘플코드를 받아야 하는데, 그전에 구글에 모델 등록을 해야한다. 

방법은 https://cloud.google.com/docs/authentication/getting-started에도 자세히 나와있지만 다시한번 정리해본다.

 

일단 라즈베리파이에 구글 어시스턴트를 설치하여 Google Cloud Platform에 본인의 프로젝트 하나가 있는 상태에서 Google assistant API를 활성화 시켰듯이 Google Speech API를 활성화 시킨다. (2018/08/18 - [라즈베리파이/기초] - 라즈베리파이에 구글 어시스턴트 설치하기 (model ID 포함)에 방법이 자세히 나와있다.

그 다음 서비스계정 키 생성 페이지에 접속한다.

 

 

서비스 계정 선택은 새 서비스 계성을 누르고 키 유형은 JSON을 선택한다. 이때 역활은 프로젝트의 소유자로 설정한다. 완료되면 생성 버튼을 누른다.

생성 버튼을 누르면 .json 파일이 하나가 다운로드 된다. 이 파일을 라즈베리파이로 옮겨 /home/pi 경로에 붙여넣는다. 이때 절대로 파일 이름을 변경해서는 안된다.

다시 라즈베리파이로 돌아와 터미널에 아래의 명령어를 본인에 맞게 수정하여 입력한다.

$ export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/[FILE_NAME].json"

 

강조된 부분을 수정하면 되는데, 예를 들어 아까 받은 .json 파일의 파일명이 testspeech-e88f1e852859.json 이라면 아래와 같이 입력하면 된다.

$ export GOOGLE_APPLICATION_CREDENTIALS="/home/pi/testspeech-e88f1e852859.json"

 

입력하면 별다른 변화 없이 넘어갈 것이다. 이로써 인증 작업은 완료됬으며, 이제 샘플 코드를 다운받고 실행시키는 과정만 남았다.

 

아래 명령을 통해 샘플코드들을 다운받는다.

$ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

 

다음으로 파이썬 가상환경을 실행시켜 준다. 가상환경을 설치하는 방법은 2018/08/18 - [라즈베리파이/기초] - 라즈베리파이에 구글 어시스턴트 설치하기 (model ID 포함)을 참고하자.

$ source env/bin/activate

 

이 샘플코드들 중 우리가 원하는 음성인식 코드는 python-docs-samples/speech/cloud-client/ 경로에 있다. (작성일 기준 가장 최신 샘플코드인 경우이다. 코드가 업데이트 됬다면 경로가 바뀔수도 있으니 https://github.com/GoogleCloudPlatform/python-docs-samples에서 직접 확인해 보자.) 해당 디렉토리로 이동한다.

(env) $ cd python-docs-samples/speech/cloud-client/

 

앞으로의 모든 과정은 python-docs-samples/speech/cloud-client/ 경로에서 진행된다. 이제 해당 디렉토리내에 있는 requirements.txt 파일을 설치하기 위해 아래 명령을 입력한다.

(env) $ pip install -r requirements.txt

 

이제 구글 음성인식을 사용할 준비가 모두 끝났다. 마이크를 이용해 영어 문장을 인식하는 예제 코드는 transcribe_streaming_mic.py로 저장되어 있다. 아래 명령을 이용해 실행하자.

(env) $ python transcribe_streaming_mic.py

 

이때 상당한 오류가 터미널 창에 표시되는데 신경쓰지 말자 성공적으로 실행됬다면 터미널 창에서 끝나지 않고 커서만 깜박거리는데, 이 상태에서 영어로 문장을 말해보자, 그러면 터미널에 본인이 말한 문장이 출력될 것이다. (만약 오류가 발생했다면 대부분 pyaudio가 없거나 인증에 문제가 있는 경우일 것이다.) 아래는 해당 코드이다.

반응형
#!/usr/bin/env python
 
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
"""Google Cloud Speech API sample application using the streaming API.
NOTE: This module requires the additional dependency `pyaudio`. To install
using pip:
    pip install pyaudio
Example usage:
    python transcribe_streaming_mic.py
"""
 
# [START import_libraries]
from __future__ import division
 
import re
import sys
 
from google.cloud import speech
from google.cloud.speech import enums
from google.cloud.speech import types
import pyaudio
from six.moves import queue
# [END import_libraries]
 
# Audio recording parameters
RATE = 16000
CHUNK = int(RATE / 10)  # 100ms
 
 
class MicrophoneStream(object):
    """Opens a recording stream as a generator yielding the audio chunks."""
    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk
 
        # Create a thread-safe buffer of audio data
        self._buff = queue.Queue()
        self.closed = True
 
    def __enter__(self):
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            # The API currently only supports 1-channel (mono) audio
            # https://goo.gl/z757pE
            channels=1, rate=self._rate,
            input=True, frames_per_buffer=self._chunk,
            # Run the audio stream asynchronously to fill the buffer object.
            # This is necessary so that the input device's buffer doesn't
            # overflow while the calling thread makes network requests, etc.
            stream_callback=self._fill_buffer,
        )
 
        self.closed = False
 
        return self
 
    def __exit__(self, type, value, traceback):
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self.closed = True
        # Signal the generator to terminate so that the client's
        # streaming_recognize method will not block the process termination.
        self._buff.put(None)
        self._audio_interface.terminate()
 
    def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
        """Continuously collect data from the audio stream, into the buffer."""
        self._buff.put(in_data)
        return None, pyaudio.paContinue
 
    def generator(self):
        while not self.closed:
            # Use a blocking get() to ensure there's at least one chunk of
            # data, and stop iteration if the chunk is None, indicating the
            # end of the audio stream.
            chunk = self._buff.get()
            if chunk is None:
                return
            data = [chunk]
 
            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = self._buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break
 
            yield b''.join(data)
# [END audio_stream]
 
 
def listen_print_loop(responses):
    """Iterates through server responses and prints them.
    The responses passed is a generator that will block until a response
    is provided by the server.
    Each response may contain multiple results, and each result may contain
    multiple alternatives; for details, see https://goo.gl/tjCPAU.  Here we
    print only the transcription for the top alternative of the top result.
    In this case, responses are provided for interim results as well. If the
    response is an interim one, print a line feed at the end of it, to allow
    the next result to overwrite it, until the response is a final one. For the
    final one, print a newline to preserve the finalized transcription.
    """
    num_chars_printed = 0
    for response in responses:
        if not response.results:
            continue
 
        # The `results` list is consecutive. For streaming, we only care about
        # the first result being considered, since once it's `is_final`, it
        # moves on to considering the next utterance.
        result = response.results[0]
        if not result.alternatives:
            continue
 
        # Display the transcription of the top alternative.
        transcript = result.alternatives[0].transcript
 
        # Display interim results, but with a carriage return at the end of the
        # line, so subsequent lines will overwrite them.
        #
        # If the previous result was longer than this one, we need to print
        # some extra spaces to overwrite the previous result
        overwrite_chars = ' ' * (num_chars_printed - len(transcript))
 
        if not result.is_final:
            sys.stdout.write(transcript + overwrite_chars + '\r')
            sys.stdout.flush()
 
            num_chars_printed = len(transcript)
 
        else:
            print(transcript + overwrite_chars)
 
            # Exit recognition if any of the transcribed phrases could be
            # one of our keywords.
            if re.search(r'\b(exit|quit)\b', transcript, re.I):
                print('Exiting..')
                break
 
            num_chars_printed = 0
 
 
def main():
    # See http://g.co/cloud/speech/docs/languages
    # for a list of supported languages.
    language_code = 'en-US'  # a BCP-47 language tag
 
    client = speech.SpeechClient()
    config = types.RecognitionConfig(
        encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code=language_code)
    streaming_config = types.StreamingRecognitionConfig(
        config=config,
        interim_results=True)
 
    with MicrophoneStream(RATE, CHUNK) as stream:
        audio_generator = stream.generator()
        requests = (types.StreamingRecognizeRequest(audio_content=content)
                    for content in audio_generator)
 
        responses = client.streaming_recognize(streaming_config, requests)
 
        # Now, put the transcription responses to use.
        listen_print_loop(responses)
 
 
if __name__ == '__main__':
    main()
 
 
참고로 음성인식 프로그램이 작동되는 시간은 대략 65초 정도이며 이상이 되면 자동으로 정지된다. 아마도 구글에서 트래픽 양을 조절하는 것 같은데 이 시간을 늘리는 방법은 아직 찾고 있다. 다만 엄밀히 말해 "유료 서비스" 이기 때문에 많이 사용하면 처음 만든 결제 수단에서 돈이 빠져나갈 것이다. 하지만 우리는 1년간 이용할 수있는 300달러를 제공받았으므로 전혀 걱정할 문제는 아니다.
영어 음성인식에 성공했으니 한국어 음성인식을 시도 안해볼수가 없다. 한국어 음성인식을 하기 위해서 우선 nano 편집기로 transcribe_streaming_mic.py를 연다.
(env) $ nano transcribe_streaming_mic.py
 
파일을 열어보면 아래와 같은 코드의 한 부분이 보일 것이다.
def main():
    # See http://g.co/cloud/speech/docs/languages
    # for a list of supported languages.
    language_code = 'en-US'  # a BCP-47 language tag
 
    client = speech.SpeechClient()
    config = types.RecognitionConfig(
        encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code=language_code)
    streaming_config = types.StreamingRecognitionConfig(
        config=config,
        interim_results=True)
 
    with MicrophoneStream(RATE, CHUNK) as stream:
        audio_generator = stream.generator()
        requests = (types.StreamingRecognizeRequest(audio_content=content)
                    for content in audio_generator)
 
        responses = client.streaming_recognize(streaming_config, requests)
 
        # Now, put the transcription responses to use.
        listen_print_loop(responses)

 

여기서 language_code = 'en-US'라고 적혀있으므로 이것을 

language_code = 'ko-KR'로 수정하면 한국어를 사용할 수있게 된다. 이제 ctrl+X로 저장하고 빠져나간뒤, 같은 방법으로 위 코드를 실행시켜보자. 

 

이제는 한국어로 말하면 해당 한국어를 텍스트로 보여줄 것이다.

 

인식률이 매우 뛰어난데 알아듣기 어렵다는 간장 공장장을 해보니 정확하게 알아 듣는다. 

음성인식에 성공했으니 라즈베리파이로 시도할수있는 분야가 확실히 넓어진 느낌이다.

반응형
    # 테스트용