Trailheadはじめました(2) #salesforce

10月も終わろうとしていますが、依然とほぼ同じペースで時間を確保することができ、大分進みました。

  • ランク: Ranger
  • バッジ: 202(内1つはハロウィーンのイベントバッヂ)
  • ポイント: 165,325
  • トレイル: 32

50個ごとにSuperbadgeにチャレンジするという計画は、100個取った時点で軌道修正しました。
そもそも今ある全部のバッヂを足しても300行かなそうなので…。というわけで、週末を利用してSuperbadgeはコンプリートしました。

必然的に日本語訳されていないモジュールに取り組むことも多くなりましたが、英語と日本語で内容が違っているのとかは相変わらずでしたね。
特に、Lightningコンポーネント関連のモジュールは多かった気がします。それだけ発展途上ってことなのかなと思いました。

今後は、ちょっとペースを落としつつ、お気に入りしているボリュームのあるモジュール(とプロジェクト)を消化していこうかなぁと思います。

Trailheadはじめました #salesforce

ずっと食わず嫌いで手を付けていなかった、というわけでもないのですが、ドラクエ11も裏ボスまで倒してやることがほぼなくなったということもあり、Trailheadをはじめました。

やってみると、結構楽しいw

今のところのスコアはこんな感じ。

  • ランク: Expeditioner
  • バッジ: 54
  • ポイント: 44,650
  • トレイル: 3

トレイルが少ないのは、手を付けやすそうなモジュールからつまみ食いをしていたためだと思われます。

もうちょっと体系立てて選んでいけばよかったなと反省。。

すすめ方

勉強時間は、平日は22時~1時くらい。休日は日によってまちまちですが、多くても6時間くらい。(大体ドラクエ11をやっていた時間ですね
飽きっぽい性分ですし、このペースを維持するのは無理でしょうが、息切れした後はマイペースにやっていこうかと思ってます。

また、50バッジごとにSuperbadgeにチャレンジしていく、みたいな自分ルールを定めてます。予定だとこんな感じ。

バッジ数 superbadge
50 Apex Specialist
100 Reports & Dashboards Specialist
150 Security Specialist
200 Lightning Experience Specialist
250 Lightning Experience Rollout Specialist
300 Data Integration Specialist

もう一つの自分ルールとして、クリアすると決めたモジュールは必ずクリアして次のモジュールに移る、という風にしています。

Trailheadのよいところ

  • 業務で触る機会のない製品についても学べる
  • 難易度や所要時間など様々なモジュールが自由に選択できる
  • ゲーミフィケーションの要素を取り入れていて、バッジを集めていくのが楽しい
  • 分かったつもりの単元でも、意外と知らない気づきがある

改善点(?)とか

  • 説明は日本語訳されているのに、演習はすべからく英語なのが辛い
  • キャプチャ画像の文字が小さすぎて読めないことがあるので、クリックで拡大されると嬉しい(画像を別タブで開くと大きいキャプチャが見られるのに気づきませんでした…)
  • 演習で失敗した場合のメッセージが分かりにくい時がある

【一休 × JapanTaxi】サービスを支えるデータ分析基盤に行ってきた #ikyu_japantaxi

8/17(木)に開催された「【一休 × JapanTaxi】サービスを支えるデータ分析基盤」というイベントに行ってきました。

ikyu.connpass.com

資料は上記ページに公開されていますが、一応メモ。

一休.comを支えるデータ分析基盤

実践的な内容でよかったです。
AWS + GCP + Azureというマルチクラウドな分析基盤でした。

  • データ分析基盤環境を再構築した話
    • Before) データ分析を行う環境と本番環境が分断されていた(1日1回しかETLできないなど)。ディスク枯渇のおそれ。
    • After) ログ基盤 + データ分析基盤をクラウドで再構築
  • RedShift上に構築したところ、移行コストが高かった…(元々はSQLServerベース、RedShiftはPostgreSQLベース)
  • Azure SQL Data Warehouseを使うことに。
  • ログ基盤(ログ=ユーザのセッションログ・行動ログ)
    • JavaScript(Ajax) / fastly / API / Kinesis / Lambda / DynamoDB / 基幹DB
    • API部分はGoで実装(t2.small * 4で安定稼働中)
      • API Gateway & Lambdaも検討したが、性能面、安定面でGoがよかった
    • ログ集約部分はKinesis -> Lambda -> Cloud Storage
      • 相性は抜群
  • データ分析基盤
    • AWS(Kinesis / Lambda), GCP(Cloud Storage / BigQuery), Azure(Storage Blob / SQL Data Warehouse)というマルチクラウド構成
    • Lambda -> Azureを直でつなぐのは、Azureの制約でできなかった
  • 活用事例
    • KPI集計, CRM施策
    • 1 to 1マーケティング
      • Nヵ月ぶりにログインしたユーザにクーポンとか
      • プライスダウン通知
  • 今後
    • DynamoDBでリアルタイムに何かしたい(BigQueryだと8Hのタイムラグがある)

質疑応答

Q. 機微な会員情報のマスクは?
A. 会員データはMS SQL Server上にだけ存在する。ユーザにはvisitor IDが割り振られていて、紐付け情報だけDWHで持っている構成。

Q. コスト面やデータ伝送料について
A. BigQueryは安い。データ伝送料についてもそこまで高くはついていない。 ただ、RedShiftは1年のリザーブインスタンスで契約したものの、結局使わなくなったので、その分のコストは無駄になってしまった。。

Q. ログのトランザクション
A. 10,000弱/分。これでAPI(Go)のCPU負荷は7%くらい。

タクシー会社を支えるデータ収集技術 〜AWS Kinesis Stream/IoT の導入事例〜

Kinesisや動態管理など、自分の仕事と共通するところも多かったので、興味深く聞かせていただきました。

  • 従来:タクシー⇔配車サーバ⇒動態DB
    • 動態DB: 緯度、経度、ステータス(空車など)を管理するDB
    • 個社ごとに動態DBがあり、BigQuery、Embulkで動態DBをつないでいた(活用されていない動態DBがあったりする)
  • リアルタイムに、過去のデータも、簡単に、便利に、安全にアクセスできるデータ収集基盤が必要になった
  • Kinesis / Lambda / S3 / Athena で構成 ⇒ 失敗。。
    • Kinesisへの書き込みでレート超過が発生
  • ELB / Collector Server / Kinesis / Lambda / S3 / Athena
  • 話変わってAWS IoTの話
    • バイスの監視には細かいロギングが必要。消費電力を抑えるにはスリープを増やす必要がある ⇒ 対立する要求
    • AWS IoTを使うことでデバイスのロジックをクラウドに移すことができる

LT1 3分で作るストリーム処理基盤 ~Kafka + Flink on Docker編~

FlinkはオフィシャルのDockerイメージが、Kafkaも非公式だがDockerイメージがあるので、docker-compose upで簡単に基盤を作ることができる、という内容でした(デモ付)

LT2 クラシルを支える分析基盤 を支える話

社内で行っているSREエンジニア育成プログラム(アポロ)の話。
SRE本、最近購入したのですがまだ読めてないので、ちょっとずつ消化していかなきゃなぁと…。

  • 本番の障害対応は未経験者には難しく、経験者にだけ経験が集まり、未経験者(新しいSREエンジニア)が育たない悪循環。
  • ミッション1 本番環境と同党のログ収集基盤を構築(Fluentd, BigQuery, re:dash)
  • ミッション2 障害対応
    • ポイント1 過去に実際に起きたインシデントを元に再現
    • ポイント2 障害対応中のタイムラインを記録

LT3 SpinAppを支えるデータ収集基盤

EC2とRedShit中心とデータ収集基盤をGCPで作り直した話でした。結果的にコストは1/8になり、BigQueryも早くて安くてよかったとのこと。

ただ、エラーを返すのが多かったり、体感での障害が(AWSに比べて)多かったとのことでした。実際どうなんでしょうね…。

LT4 b→dashのデータ処理基盤

データ処理基盤のアーキテクチャ刷新のお話でした。
システム拡大に伴ってマイクロサービス化をしたり、データ容量の増大に伴ってスケーラビリティが容易な構成にしたり。
一方で、個社カスタマイズも必要になるとのことで、共通の部分と個別の部分を切り分けで設計されていたとのことでした。


というわけで、いろんな会社のデータ分析基盤について、実践的なアーキテクチャの話を聞くことができた勉強会でした。あと、会場の一休さんも自社から近いため、行きやすくてよかったです。
RedShiftはいいぞ、みたいな話がほぼなく、むしろBigQueryを使ってAWSGCP組み合わせたケースが多いんですね。

聞いた内容については、うまくプロジェクトにも還元していきたいです。

主催者、発表者、参加者の皆さま、おつかれさまでした。

Spring BootのRedisCacheManagerでJSONをいろんな型で利用する

Spring Bootのキャッシュ抽象化(Redis)でJSONを利用したい場合のメモです。

■ 目次

そもそもなんでJSON?

Redisの中身を直接調べる場合に、ヒューマンリーダブルであることが大きな理由です。デフォルトであるJavaのオブジェクトシリアライズ文字列だと、ほとんど中身が分からないので。

Configuration

先に結論ですが、Configurationは以下のようになりました。

やっていることは、

  • デフォルトとJSON用の2つの CacheManager を登録 *1
  • シリアライズでエラーがあった場合、無視して直接メソッドが呼ばれるように、カスタムのエラーハンドラを登録

の2つです。

@Configuration
@EnableCaching
public class CacheConfig extends CachingConfigurerSupport {

    /**
     * デフォルトのCacheManager
     */
    @Bean
    @Primary
    CacheManager cacheManager(RedisTemplate<String, Object> redisTemplate) {
        Map<String, Long> expires = new HashMap<>();
        // 略: キャッシュの有効期間を設定
    
        RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
        cacheManager.setExpires(expires);

        return cacheManager;
    }

    /**
     * JSON版のCacheManager
     */
    @Bean(name = "JsonCacheManager")
    CacheManager jsonCacheManager(RedisConnectionFactory connectionFactory) {
        ObjectMapper mapper = new ObjectMapper()
                .registerModule(new JavaTimeModule()) // LocalDateTime などの Date and Time API 関連のフィールドを扱う 
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) // 不明なプロパティがあっても無視
                .enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class"); // デコード時の型の解決(後述)

        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setKeySerializer(new StringRedisSerializer()); // キーはシリアライズせずにただの文字列にする
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer(mapper)); // 値はJSONエンコード・デコードを利用する
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.afterPropertiesSet();

        Map<String, Long> expires = new HashMap<>();
        // 略: キャッシュの有効期間を設定

        RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
        cacheManager.setExpires(expires);
        return cacheManager;
    }

    @Override
    @Bean
    public CacheErrorHandler errorHandler() {

        return new SimpleCacheErrorHandler() {

            @Override
            public void handleCacheGetError(RuntimeException exception, Cache cache, Object key) {
                // キャッシュからのデシリアライズに失敗した場合は、エラーにはせずに対象のメソッドをそのまま呼ぶ(その結果はキャッシュされる)
                if (isSerializationError(exception)) {
                    return;
                }

                throw exception;
             }

            private boolean isSerializationError(RuntimeException exception) {
                if (exception instanceof SerializationException) {
                    return true;
                }

                Throwable cause = exception.getCause();
                return (cause != null && cause instanceof SerializationException);
            }
        }
    }
}

Jackson2JsonRedisSerializerとGenericJackson2JsonRedisSerializerの違いについて

CacheConfig の中では、 value serializer(Redisへの値の書き込み・読み込み手段)として GenericJackson2JsonRedisSerializer を使いました。

そもそもRedisTemplate に渡すJSON 用の value serializer には2種類あります。

  • Jackson2JsonRedisSerializer
  • GenericJackson2JsonRedisSerializer

両者の違いは、シリアライズ・デシリアライズの対象の型にあります。前者は特定の型に対してのものであり、後者はいろんな型( java.lang.Object とそのサブクラス)で使えます。
インスタンス作成方法の違いを見ると分かりやすいかもしれません。

Jackson2JsonRedisSerializer<Entity> x = new Jackson2JsonRedisSerializer<>(Entity.class);
x.setObjectMapper(mapper);

GenericJackson2JsonRedisSerializer y = new GenericJackson2JsonRedisSerializer(mapper);

Jackson2JsonRedisSerializer はコンストラクタ中でクラスを指定する必要があることから、結果的に、キャッシュが返す型ごとに CacheManager を用意する必要があり、煩雑になります。JSON用の CacheManager は1つにしたいので、 GenericJackson2JsonRedisSerializer を使っています。

ちなみに、

Jackson2JsonRedisSerializer<Object> x = new Jackson2JsonRedisSerializer<>(Object.class);

とすればいいじゃん?と思うかもしれませんが、これは、結局 GenericJackson2JsonRedisSerializer を使った場合と同じ挙動になります。

GenericJackson2JsonRedisSerializerを使うと、戻り値がLinkedHashMapになり、呼び出し時にキャストエラーになる

さてここで一つ問題があります。単純にGenericJackson2JsonRedisSerializerを使っただけだと、キャッシュから値が取得できた場合に戻り値が LinkedhashMap になってしまいます。
そうすると、メソッドを呼び出して結果を代入する箇所で ClassCastException(実行時エラー)が起きてしまいます。

具体的な例を示します。キャッシュを使ったメソッドの実装を以下のようにした場合 *2 に、

@Service
@RequiredArgsConstructor // Lombok
@Transactional(readOnly = true)
public class HogeService {

    private final HogeRepository hogeRepository;

    @Cacheable(value = "aaa", key = "'example:hoge:' + #key", cacheManager = "JsonCacheManager")
    public Hoge findByKey(String key) {
        return hogeRepository.findByKey(key);
    }
}

呼び出し側は

Hoge hoge = hogeService.findByKey("xxx");

となりますが、ここで、 LinkedHashMap から Hoge へのキャストが発生し、 ClassCastException が起きます。

キャッシュ内のJSON中に、型解決のためのヒントとなるプロパティを埋め込むようにする

上記の問題点の解決方法です。

まず、Spring BootというよりJacksonの話になりますが、デコード時の型として java.lang.Object を指定した場合、戻り値の型は、JSON文字列に応じて以下のようになります。

  • “1” とか “5.3” などの数値の場合 => IntegerDouble
  • “[ 3, 4 ]” などのリストの場合 => ArrayList
  • “{ "aaa” : “nnn” }“ のようなオブジェクトの場合 => LinkedHashMap

GenericJackson2JsonRedisSerializer の実装では、java.lang.Object を使ってデコードをしようとしていることから、キャッシュから値が取得できた時に LinkedHashMap になってしまい、戻り値の型 Hoge へのキャストが発生しているわけですね。

java.lang.Object ではなく、戻り値の型を使ってデコードしてくれればいいのに…」と思うわけですが、残念ながら RedisSerializer のデシリアライズのメソッドシグネチャは、

    T deserialize(byte[] bytes) throws SerializationException;

のようになっており、メソッドの引数から、戻り値の型を解決することができません。

じゃあどうすればよいか?と思うわけですが deserialize メソッドの引数はデコード対象のバイト列(JSON文字列)だけです。 なので、これを唯一の手がかりとして、JSON文字列の中に、具体的な型を解決するためのヒントを埋め込んであげればよさそうです。

キャッシュから値を読み込む時に、型情報を参照にする

CacheConfig 中の以下のコードが該当します。

objectMapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class"); 

これは「デコード時に指定された型が java.lang.Object の場合、JSON文字列中の “@class” プロパティをヒントにして型を解決してね」という意味です。なお、 “@class” の値には、クラスのFQCN文字列がセットされます。

ちなみに、JSON文字列中に “@class” プロパティが存在しない場合はエラーとなります。キャッシュの中身が不正なだけで、呼び出すたびにエラーになってしまうのは不便なので、カスタムエラーハンドラを作り、 SerializationException の時は無視してメソッドの呼び出しを行なうようにしています。

キャッシュに値が書き込む時に、型情報を付与する

クラスに @JsonType アノテーションを付けてあげます。

package sample;

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
public class Hoge {
    // 中身は省略
}

これにより、JSONエンコード時、すなわちキャッシュに値を書き込む時に、 @class というプロパティに “sample.Hoge” が付与されて書き込まれます。

まとめ

Spring BootのRedisCacheManagerでJSONをいろんな型で利用する場合のまとめです。

  • CacheManager を用意し、 value serializer に GenericJackson2JsonRedisSerializer を与える
  • キャッシュの戻り値として利用するクラスには @JsonTypeInfo を付けてあげる
  • GenericJackson2JsonRedisSerializer にセットする ObjectMapper には、型を解決するためのヒントを enableDefaultTypingAsPropertysetDefaultTyping メソッドで指定してあげる
  • キャッシュのデシリアライズに失敗した場合に備えて、 `CachingConfigurerSupport#errorHandler をオーバーライドしてカスタムのエラーハンドラを作り、デシリアライズエラー時はキャッシュを諦めてメソッドを呼ぶようにした方が、(おそらく)安全

*1:複数の CacheManager を統合する CompositeCacheManager というのもありますが、今回は簡略化のため複数利用しました。

*2:キャッシュの使い方としてちょっと古いバージョンの実装かもしれません。。

エラー処理を考慮したコールアウト機構としてプラットフォームイベントが使えないか? #salesforce

昨日の続き。

jappy.hatenablog.com

Salesforce to 外部ソース」のデータ連携(HTTPコールアウト)時に、プラットフォームイベントが有効に使えないか?と、ふと考えました。

この手の話では、これまでは @futureQueueable Apexを使うのが常套手段だったかと思います。ただ、コールアウトが絡んだアーキテクチャ設計においては、単発でコールできればそれでOKというわけではなく、送信失敗時のエラーハンドリングやリトライについても考える必要があります。

例えば以下の記事です。

developer.salesforce.com

上記の記事の趣旨は「 @future よりも Queueable を使おう」というものですが、コールアウト処理のステータスを保持するカスタムオブジェクト ( Callout__c )を作って、実行履歴を残したり後々のリカバリができるような余地を残している、というのが一つ注目すべきポイントです。

これに近いことがプラットフォームイベントでもできるんじゃないかなーと思ってます。
保持期間が24時間なようなので、履歴の保持はできないかもしれませんが。

Summer '17で正式リリースされたプラットフォームイベントの有効な使い方を考えてみた #salesforce

Salesforce Summer ‘17でプラットフォームイベント(エンタープライズメッセージングプラットフォーム)なる機能が正式リリース されました。

新機能については、以下の動画で分かりやすく説明されてます。

Summer ‘17 開発者向け新機能 - YouTube

動画を見ると、このプラットフォームイベントという機能、なんとなく便利そうだなーという気がしてきます。しかし、返品リクエストのデモ(00:50あたり)だけだと、その便利さについてまだ腹落ちできませんでした*1

  • 今まででも同じようなアプリは作ろうと思えば作れるよなぁ。
  • カスタムオブジェクトでいいんじゃない?

といった疑問が残ります。

そこで、「あぁたしかにこれは便利だ!」と思えるような有効な使い方がないか、考えてみました。

免責事項

プラットフォームイベントはメッセージングの機構なので、メッセージをやり取りする主体には発行側(Pub)、購読側(Sub)があり、Pub/Subの組み合わせには以下の4パターンがあります。

Pub Sub
Salesforce Salesforce
Salesforce 外部ソース
外部ソース Salesforce
外部ソース 外部ソース

今回は3番目の、Pub=外部ソース、Sub=Salesforceのシナリオに絞った内容です。他のパターンでも(多分)有効な活用シーンが色々あるでしょう。

また、今回の話ですが、思いついたアイディアをメモ用に記事にしているだけなので、実現性については未検証です。そのうち試します(きっと)

【活用例1】大量のレコードを1個のイベントに詰めて登録し、SOAP APIコールの制限を抜ける

1個目は、「大量のレコードを最低限のAPI消費でSalesforceに登録したい」という要望を満たすために、複数のレコードをシリアライズして1個のプラットフォームイベントに詰めてPublishし、Salesforce(Subscriber)側でデシリアライズをしてからレコードを登録していく、というアプローチです。

そもそもSOAP APIコールの場合、createやupdateは1回で200レコードまでです。

developer.salesforce.com

それ以上のレコードを登録したい場合、APIコールを分けるか、Bulk APIの利用を検討する必要があります。APIコールを分ける場合、当たり前ですがAPIを余分に消費してしまいます。Bulk APIは、結果の評価を非同期で行う必要がありますし、扱いに癖があります。

やり方はこんな感じ。

  1. (Salesforce) ロングテキストエリア側のカスタム項目をもつプラットフォームイベントを作成
  2. (外部ソース) 外部ソースからAPIを利用してプラットフォームイベントをPublishする。その際、ロングテキストエリア項目に登録したいレコードをJSONシリアライズして詰めておく。
  3. (Salesforce) プラットフォームイベントのApexトリガの処理で、ロングテキストエリア項目をJSONシリアライズし、元のレコードを復元する。
  4. (Salesforce) 復元したレコードをINSERT/UPDATEする。

AWSKinesisでいう、Aggregation & Deaggregationライブラリと同じ発想ですね。割と常套手段なんでしょうか…。

【活用例2】活動(行動/ToDo)の関連先を動的に割り当てる

2個目は、「外部から登録する活動の関連先を動的に割り当てる」という要望を満たすために、直接 Event を登録するのではなく、中間にプラットフォームイベントを挟む、というアプローチです。

そもそも活動を外部からAPIで登録する場合、関連先(WhatId)項目の指定が必須であり、かつ、関連先にはSalesforce IDをセットしなければなりません。外部IDが使えないのは意外と不便だったりします。おそらく、どのオブジェクトに対する活動か、というのを術がないからだと思いますが。

間にプラットフォームイベントを挟んでしまえば、Apexトリガやプロセスビルダーで動的に関連付けができるのでは?と考えました。

まとめ

どちらの活用例にも共通して言えることは、直接 create/update するのではなく間にプラットフォームイベントをクッションとして挟むことで、柔軟性を持たせられる、ということですね。そう考えると、色々便利に使えそうな気がしてきます。

*1:ちなみにTrailheadはまだやってません…。

DynamoDBのTTL機能とLambdaを使ってサーバレスでのタイマー実行処理ができないか考えてみた

バッチ処理の中には、定期実行ではなく、特定の時刻に実行したい、みたいなものがあります。
例えば、今から5分後とか3日後の12:00とか。ジョブスケジューラでよくある感じのやつです。

AWSで、サーバレスなアーキテクチャ(実行基盤はAWS Lambda *1)でこの処理を実現する方法について考えました。

定期実行の場合

そもそも単純な定期実行の場合は、以下に書かれているように、CloudWatch Eventsを使うことで実現できます。

スケジュールされたイベントでの AWS Lambda の使用 - AWS Lambda

ただこの仕組みは「Rate または Cron を使用したスケジュール式 - AWS Lambda」で表現可能なスケジュール実行しかできないので、起動したい処理(タスク)がいっぱいあり、それらの指定開始時刻がバラバラな場合、使うのは難しそうです。

タイマー実行(スタンダード?な方法)

スタンダードなやり方だと、以下のどちらかでしょうか。

  • 処理タスクをキューに貯めて、定期的にポーリング
  • EC2などでQuartzのようなスケジューリング用のライブラリを使う

タイマー実行(DynamoDB)

今回考えた方法では、DynamoDBのTTL(Time to Live)機能を利用します。アイテムに設定された時刻(有効期限)になると、アイテムが削除されます。TTLを使った利用シーンとしては、キャッシュ、セッション管理、不要になった古いアイテムのパージなどですかね。

新機能 – TTL(Time to Live)機能を利用したDynamoDBアイテムの管理について | Amazon Web Services ブログ

重要なのは、TTLによってアイテムが削除された場合も、通常の追加・更新・削除と同様にDynamoDB Streamのトリガ(Stream)でイベントを発火させ、Lambdaで処理させることができる点です。

そのため、有効期限(TTL)として処理を開始したい時刻を設定することで、指定時刻でLambdaが実行できると考えました。

というわけで、簡単に試してみました。

で、結論ですが、「指定した時刻で」という条件を守るのは難しそうです。うーん、現実は厳しい。。
というのも、以下のドキュメントに書かれているとおり、DynamoDBの有効期限は、その時刻になったら削除されるというものではなく、ベストエフォートベース(具体的にいつ削除されるかは分からない)んですね。

docs.aws.amazon.com

今回は新規にテーブルを作って試したのですが、有効期限を00時26分35秒に作ったアイテムが実際に削除されたのは00時38分58秒(約12分後)でした。

Amazon Simple Workflow Service (SWF)

ここまで書いてAmazon Simple Workflow Service (SWF)の存在を思い出しました。
個人的にまだ使ったことはなく、名前にSimpleというワードを含んでいながらAWSの中でもトップクラスに複雑なサービスというイメージなんですが、どうなんだろう…。

*1:AWS Lambda単体で難しい場合、例えば、処理が複雑であったり、タイムアウトが起きるくらい実行時間がかかる場合は、AWS Step FunctionsやAWS Batchを使う(組み合わせる)イメージです。