Kafka ❤️ OpenAI - Mit ChatGPT Kafka Topics in Echtzeit filtern
KI und Streaming sind ein Traumpaar.
Echtzeit Datenströme mit KI in Echtzeit analysiert, strukturiert und kategorisiert bieten enorme Möglichkeiten, die heute noch vergleichsweise schwer umzusetzen sind. Ich mache aber heute die kühne Aussage, dass sich das in den nächsten 6 Monaten ändert.
Jeden Mittwoch öffnen die Nebulit Research Labs ihre Tore für einen Abend und wir probieren neue Dinge aus.
Diesmal verheiraten wir Künstliche Intelligenz und die Open AI API mit Apache Kafka um Records in Echtzeit zu filtern. Was für eine wunderschöne Hochzeit. ❤️
TLDR - was du brauchst um den Use Case zu starten.
Du brauchst einen OpenAI API Key (kostenloser Probeaccount ist ausreichend)
Du brauchst ein lokal installiertes Docker.
Um den kompletten Use Case zu starten reicht es, das Projekt in deine IDE zu importieren und den Test OpenAiKafkaApplicationTests zu starten.
Der Test startet ein lokales Kafka, verbindet sich mit der OpenAI Schnittstelle und sendet 10 zufällig generierte Test Records die von OpenAI analysiert werden.
Records die von der KI als verdächtig eingestuft werden, werden direkt herausgefiltert.
@SpringBootTest(classes = [TestOpenAiKafkaApplication::class]) @Testcontainers class OpenAiKafkaApplicationTests { @Autowired private lateinit var kafkaTemplate: KafkaTemplate<String, Payment> @Test fun contextLoads() { (1..10).forEach { kafkaTemplate.send("test-topic", "test", Payment().apply { this.age = Random.nextInt(from = 5, until = 40) this.amount = BigDecimal(Random.nextDouble(from = 5.toDouble(), until = 999.toDouble())) }) } Awaitility.await().pollDelay(Duration.ofMinutes(2)) .timeout(Duration.ofMinutes(3)) .untilAsserted { assertTrue(true) }; } }
Für die Analyse nutzen wir einfache Zahlungsdaten in einem Online-Shop. Analysiert wird der Betrag und das Alter des Käufers. Eine Zahlung ist dann verdächtig, wenn der Betrag höher ist, als das was normalerweise für dieses Alter üblich ist ohne dies exakt zu spezifizieren.
class Payment { var amount: BigDecimal = BigDecimal(0) var age = 16 }
Die Prompt die wir hierfür verwenden ist in der application.properties hinterlegt.
You are a payment analyzing software.
You will get requests in json containing age and amount.
Return parseable JSON in the form : {\"suspicious\": true|false,\"reasoning\":\"<your reasoning>\"} if the payment is suspicious.
Assume kids have the average amount of pocket money in germany.
Give your reasoning in the 'reasoning' field"
Das Projekt
Wir nutzen Spring Boot und Spring Kafka mit der Consumer API.
(In den nächsten Experimenten schauen wir uns die Integration mit Kafka Streams und KSQL an)
Für die Kommunikation mit OpenAPI nutzen wir die einfache HTTP Schnittstelle.
@Component class OpenAiConnector(@Value("\${open-ai.url}") var openAI:String, @Qualifier("open-ai-template") var restTemplate: RestTemplate) { fun requestResponse(request: ChatRequest): String? { measureTimeMillis { val result = restTemplate.postForObject(openAI, request, ChatResponse::class.java) return result?.choices?.firstOrNull()?.message?.content } } }
Für das Filtern der Records nutzen wir eine RecordFilterStrategie. Dies ist genau die richtige Stelle um Records zu filtern bevor sie an die eigentlichen Consumer weitergeleitet und verarbeitet werden.
Die Logik basiert auf dem Attribut suspicious aus der Response der OpenAPI Schnittstelle und dient dazu, diesen Zahlungsvorganga als verdächtig einzustufen.
Sollte dies der Fall sein wird das Record nicht weiter verarbeitet.
class OpenAiRecordFilterStrategy(var advice: String, var openAiConnector: OpenAiConnector) : RecordFilterStrategy<String, Payment> { override fun filter(record: ConsumerRecord<String, Payment>): Boolean { val result = jacksonObjectMapper().readValue( //call openAPI openAiConnector.requestResponse( //build prompt from record buildPromptFromRecord(record) ), Response::class.java ) // use open ai provided information if (result.suspicious) { println("Suspicious record filtered out. Reason: ${result.reasoning}") // true means, record is filtered an not processed any further. return true } // process record regularly return false } }
Das Ergebnis
Der Ansatz ist natürlich naiv. Die Performance der Schnittstelle wie wir sie hier verwenden eignet sich keinesfalls für Echtzeitverarbeitung, darum gehts aber hier aktuell nicht.
Die durchschnittliche Antwortzeit beträgt etwa 6 Sekunden.
Pro Call brauchen wir etwa 150 Tokens (Token ist die Einheit, die für die Abrechnung der API verwendet werden.) Grob 4 Characters entsprechen einem Token.
Für 1000 Tokens werden 0.0010 $ berechnet. Würden wir also 10k Calls pro Tag machen wäre unser Budget pro Tag dafür (ganz grob) bei 1,50 $
Die Flexibilität ist aber interessant.
Die Prompts sind ausgelagert und könnten auch über eine WebAPI dynamisch in die Applikation gegeben werden.
Dadurch können Event-Ströme per einfacher Textanweisung gefiltert werden.
Use Cases
- Record können wie hier dynamisch gefiltert werden.
- Records können auf Basis der Anweisungen in verschiedene Topics geleitet werden.
- Datenbereinigung on the fly
- Records können zur Laufzeit um Metadaten aus der KI Analyse angereichert werden (machen wir im nächsten Experiment)
Fazit
Echtzeit-Daten mit Kafka und KI sind hochinteressant und die Use Cases sind beinahe unbegrenzt. Wir werden uns in den nächsten Wochen noch einige weitere Use Cases anhand verschiedener Beispielprojekte anschauen und weiter in unseren Labs experimentieren.
Wir reden immer gerne über Kafka, Streaming und die Integration dieser Komponenten. Falls du mit Kafka arbeitest und Erfahrungen austauschen möchtest, buch dir gernen einen Call.
Neugierig geworden? Mit unserer Expertise im Bereich Kafka, Kotlin und Spring bieten wir die Möglichkeit ihre Entwicklung zu beschleunigen und produktiver zu machen.
Ein unabhängiges Architektur-Review schafft Klarheit über den Status Quo.
In unserem kostenlosen Kennenlern-Call hast Du zudem die Möglichkeit, kostenlos erste Fragen zu platzieren.