めもちょー

メモ帳代わりに使っています。

ジャルジャルデータベース作ってみた

Djangoの練習として奴DBというDBのvisualizerをつくりました。
「奴」とはジャルジャルのYouTubeチャンネルでジャルジャルが演じてるキャラクターのことです。
日々、演じてるキャラクターが変わるのでDB化したいなと考えていました。

主な技術としては、Django+MySQL+YouTube APIを使いました。
YouTube APIはcronで定期実行しています。

昨年もYouTube APIをいじることがあって整備をしていたのですが、今回動かすと動かなくなっていました。
原因を調べたら、YouTube Statistics APIにおいて「低評価」が無くなったことによるDBの不整合が起きていたようでした。

いずれDockerのDjango, MySQLコンテナ等でデプロイできたらなあと思います。
ローカルではcronで処理しているYouTube APIの定期実行をクラウドで行うには、
Cloud Watch Eventsで指定した時刻を取得し、Lambdaで外部APIをたたけばよいらしいです。


DBをupdateするコードはproject/management/commands/update_videos_info.pyに記述

export YOUTUBE_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
python manage.py update_videos_info
from django.core.management.base import BaseCommand
from db.models import Video, Yatsu, VideoStatistics
import os
import sqlite3
import datetime
from apiclient.discovery import build
from dateutil.relativedelta import relativedelta
from db.models import Video, VideoStatistics
import re


class Command(BaseCommand):
    # [python manage.py help sampleBatch]で表示されるメッセージ
    help = "video, video_statisticsを更新するバッチ"

    def add_arguments(self, parser):
        # コマンドライン引数を指定
        # parser.add_argument('--check', default='default_value')
        parser.add_argument(
            "--s", dest="start_at", help="集計開始時刻 YYYY-MM-DD"
        )  # 2021-10-05 06:46:41.459086
        parser.add_argument("--e", dest="end_at", help="集計終了時刻 YYYY-MM-DD")
        # parser.add_argument('--d', dest='duration', help='期間(月)')

        # 期間

    def handle(self, *args, **options):
        youtube = YouTube()
        channel_id = "UChwgNUWPM-ksOP3BbfQHS5Q"

        # 引数のチェック
        print(options)
        # 動画情報の収集
        # 動画情報のインサート
        YYYY, MM, DD = map(int, options["end_at"].split("-"))
        end_at = datetime.datetime(YYYY, MM, DD)
        YYYY, MM, DD = map(int, options["start_at"].split("-"))
        start_at = datetime.datetime(YYYY, MM, DD)
        print("start_at: ", start_at)
        print("end_at: ", end_at)
        delta_months = int((end_at - start_at).days / 30) + 3
        anchor_at = start_at
        while anchor_at < end_at:
            _from = anchor_at.isoformat() + "Z"
            _to = (anchor_at + relativedelta(months=1)).isoformat() + "Z"
            youtube.search(channel_id, _from, _to)  # 検索
            results = youtube.get_video_details()
            # insert
            insert_to_db(results)
            youtube.reset_search_results()

            anchor_at = anchor_at + relativedelta(months=1)

    def check_argument(self, options):
        pass


class YouTube:
    def __init__(self):
        self.API_KEY = os.environ["YOUTUBE_API_KEY"]
        print(self.API_KEY)
        self.YOUTUBE_API_SERVICE_NAME = "youtube"
        self.YOUTUBE_API_VERSION = "v3"
        self.youtube = build(
            self.YOUTUBE_API_SERVICE_NAME,
            self.YOUTUBE_API_VERSION,
            developerKey=self.API_KEY,
        )
        self.videos = []

    def search(self, channel_id, _from, _to, show_flag=True):
        print(_from, _to)
        print("--" * 30)
        self.search_response = (
            self.youtube.search()
            .list(
                channelId=channel_id,
                part="snippet",
                type="video",
                maxResults=50,
                publishedAfter=_from,
                publishedBefore=_to,
                pageToken="",
            )
            .execute()
        )
        for search_result in self.search_response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":
                self.videos.append(search_result)

        try:
            nextPagetoken = self.search_response["nextPageToken"]
            self._search(nextPagetoken, _from, _to)
        except:
            return

    def reset_search_results(self):
        self.videos = []

    def get_video_details(self):
        self.video_details = []
        video_ids = [v["id"]["videoId"] for v in self.videos]
        for video_id in video_ids:
            response = (
                self.youtube.videos()
                .list(part="snippet, statistics", id=video_id)
                .execute()
            )
            for video_detail in response.get("items", []):
                if video_detail["kind"] != "youtube#video":
                    continue
                self.video_details.append(video_detail)
        return self.video_details


def get_val(d, k):
    if k not in d:
        return None
    else:
        if d[k] == "none":
            return None
        else:
            return d[k]


def get_val_with_depth(d, k_list):
    if not k_list:
        assert False, "k_listは複数"
    elif len(k_list) == 1:
        return get_val(d, k_list[0])
    elif k_list[0] in d:
        return get_val_with_depth(d[k_list[0]], k_list[1:])
    else:  # k_list[0] not in d:
        return None


def get_title_yatsu_name(s):
    if "ジャルジャルのネタのタネ" in s:
        for _ in re.split('[『』]', s):
            if _.endswith('奴'):
                title = _
                return title
    return None


def insert_to_db(search_results):
    # 検索結果を各テーブル用に分解
    for search_result in search_results:
        try:
            snippet = search_result["snippet"]
            if "defaultAudioLanguage" in snippet:
                default_audio_language = None
                if snippet["defaultAudioLanguage"] == "none":
                    default_audio_language = None
                else:
                    default_audio_language = snippet["defaultAudioLanguage"]

            video_record = {
                "video_id": search_result["id"],
                "etag": search_result["etag"],
                "url": f'https://www.youtube.com/watch?v={search_result["id"]}',
                "title": snippet["title"],
                "published_at": snippet["publishedAt"],
                "channel_id": snippet["channelId"],
                "category_id": get_val(snippet, "categoryId"),
                "live_broadcast_content": get_val(snippet, "liveBroadcastContent"),
                "default_audio_language": get_val(snippet, "default_audio_language"),
                "thumbnail_default_url": get_val_with_depth(
                    snippet, ["thumbnails", "default", "url"]
                ),  # thumbnail_default_url,#snippet['thumbnails']['default']['url'],
                "thumbnail_high_url": get_val_with_depth(
                    snippet, ["thumbnails", "high", "url"]
                ),  # snippet['thumbnails']['high']['url'],
                "thumbnail_standard_url": get_val_with_depth(
                    snippet, ["thumbnails", "standard", "url"]
                ),  # snippet['thumbnails']['standard']['url'],
                "thumbnail_maxres_url": get_val_with_depth(
                    snippet, ["thumbnails", "maxres", "url"]
                ),  # snippet['thumbnails']['maxres']['url'],
                "title_yatsu_name": get_title_yatsu_name(snippet["title"])
            }
            statistics = search_result["statistics"]
            video_statistics_record = {
                "video_id": search_result["id"],
                "view_count": statistics["viewCount"],
                "like_count": statistics["likeCount"],
                "favorite_count": statistics["favoriteCount"],
                "comment_count": statistics["commentCount"],
            }
        except Exception as e:
            print("---------------- Key Error -------------------")
            print(e)
            print(search_result)
            continue

        # レコードを挿入
        if not Video.objects.filter(video_id=video_record["video_id"]):
            video = Video(**video_record)
            video.save()

        if not VideoStatistics.objects.filter(
            video_id=video_statistics_record["video_id"]
        ):
            video_statistics = VideoStatistics(**video_statistics_record)
            video_statistics.save()

modelはひとまず下記のように定義

from django.db import models


class Video(models.Model):
    video_id = models.CharField(
        "video_id", max_length=30, primary_key=True, help_text="動画のURLにある文字列"
    )
    etag = models.CharField("etag", max_length=30)
    url = models.URLField("url")
    title = models.CharField("title", max_length=100)
    published_at = models.DateTimeField("published_at")
    channel_id = models.CharField("channel_id", max_length=30)
    category_id = models.IntegerField("category_id", null=True, blank=True)
    live_broadcast_content = models.CharField(
        "live_broadcast_content", max_length=30, null=True, blank=True
    )
    default_audio_language = models.CharField(
        "default_audio_language", max_length=30, null=True, blank=True
    )
    thumbnail_default_url = models.URLField(
        "thumbnail_default_url", null=True, blank=True
    )
    thumbnail_high_url = models.URLField("thumbnail_high_url", null=True, blank=True)
    thumbnail_standard_url = models.URLField(
        "thumbnail_standard_url", null=True, blank=True
    )
    thumbnail_maxres_url = models.URLField(
        "thumbnail_maxres_url", null=True, blank=True
    )
    title_yatsu_name = models.CharField(
        "title_yatsu_name", max_length=30, default=None, null=True, blank=True
    )  # タイトルから『』の部分の文字列
    created_at = models.DateTimeField("created_at", auto_now_add=True)
    updated_at = models.DateTimeField("updated_at", auto_now=True)


class VideoStatistics(models.Model):
    video_id = models.CharField("video_id", max_length=30, primary_key=True)
    view_count = models.IntegerField("view_content")
    like_count = models.IntegerField("like_count")
    #dislike_count = models.IntegerField("dilike_count")
    favorite_count = models.IntegerField("favorite_count")
    comment_count = models.IntegerField("comment_count")
    created_at = models.DateTimeField("created_at", auto_now_add=True)
    updated_at = models.DateTimeField("updated_at", auto_now=True)