20 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд
Загрузка...

Разбираемся с многопоточностью в RxJava

Лекция 3 по архитектуре андроид приложения. Знакомство с RxJava

Третья лекция Курса по архитектуре клиент-серверных android-приложений, в которой мы познакомимся с RxJava и основными операторами, а также узнаем, как создавать Observable, преобразовывать потоки данных, работать с RxJava в Android и решать проблему Backpressure.
Ссылки на исходный код ваших решений вы можете оставлять в комментариях. Делитесь вашими решениями с сообществом, получайте обратную связь и конструктивную критику. Лучшие решения будут опубликованы на нашем канале и сайте fandroid.info с указанием авторства победителей!

Введение

Сейчас мы знаем немало способов выполнять какую-то работу в фоновых потоках. Под работой здесь чаще всего понимаются серверные запросы. Но насколько все хорошо с этими способами? Нужно признать, что у них немало недостатков.

Во-первых, эти способы не гибкие. Допустим, нужно выполнить два запроса параллельно, а потом, дождавшись, когда оба завершат работу, выполнить третий запрос с использованием результатов предыдущих запросов. Если вы попробуете реализовать такую задачу с использованием лоадеров, то скорее всего у вас получится большое количество булевских флагов, полей для сохранения результата и много кода. К проблемам гибкости можно также отнести и то, что сложно хорошо реализовать периодическое обновление данных.

Во-вторых, неудобно обрабатывать ошибки. Опять же, в случае лоадеров мы можем вернуть только один результат. Конечно, можно использовать специальные классы, которые будут служить как для передачи данных, так и для передачи ошибок, но это неудобно.[wpanchor id=»2″]

Именно поэтому нам нужно рассмотреть другие возможности для выполнения сетевых запросов и для работы с данными. И в первую очередь такой возможностью является популярный фреймворк RxJava.

RxJava

Фреймворк RxJava позволяет использовать парадигму функционального реактивного программирования (FRP) в Android. Понять, что это значит, весьма непросто, поэтому требуется немало пояснений. Во-первых, слово функциональное означает то, что в FRP основным понятием являются функции и в этом эта парадигма схожа с обычным функциональным программированием. Конечно, в Android весьма затруднительно использовать полноценное функциональное программирование, но все же с помощью RxJava мы смещаем приоритет от объектов к функциям. Во-вторых, реактивное программирование означает программирование с асинхронными потоками данных. Проще всего это пояснить на практике: поток данных – это любой ваш запрос на сервер, данные из базы, да и обычный ввод данных от пользователя (если говорить совсем уж откровенно, то создать поток данных можно абсолютно из чего угодно), который чаще всего выполняется в фоновом потоке, то есть асинхронно. Если объединить два объяснения, то получим, что функциональное реактивное программирование – это программирование с асинхронными потоками данных, которыми можно манипулировать с помощью различных функций.

Определение звучит красиво, но только совсем непонятно, зачем это может понадобиться. Оказывается, еще как может. RxJava позволяет решить почти все проблемы, которые были озвучены во введении. В качестве основных преимуществ RxJava выделяются следующие:

  • Обеспечение многопоточности. RxJava позволяет гибко управлять асинхронностью выполнения запросов, а также переключать выполнение операций в различные потоки. Кроме того, что немаловажно для Android, RxJava также позволяет легко обрабатывать результат в главном потоке приложения.
  • Управление потоками данных. Это позволяет преобразовывать данные в потоке, применять операции к данным в потоке (к примеру, сохранять их данные из потока в базу), объединять несколько потоков в один, изменять поток в зависимости от результата другого и многое другое.
  • Обработка ошибок. Это еще одно очень важное преимущество RxJava, которое позволяет обрабатывать различные ошибки, возникающее в потоке, повторять серверные запросы в случае ошибки и передавать ошибки подписчикам.

И самое приятное, что все преимущества выше достигаются буквально за пару строчек кода!

Использовать RxJava непросто, а использовать ее правильно еще сложнее, и это требует достаточно долгого изучения. RxJava – это очень большой фреймворк, чтобы правильно работать с ним (и в частности с парадигмой функционального реактивного программирования), нужно очень много изучать и практиковаться. RxJava достойна отдельного курса, по ней существует огромное количество статей, и их число увеличивается с каждым днем. По RxJava написана уже не одна книга, поэтому нельзя надеяться, что можно хорошо изучить этот фреймворк в рамках одной лекции. Но мы постараемся рассмотреть основные возможности RxJava и то, как ее можно применить для Android.

Мы немного рассмотрели суть RxJava и функционального реактивного программирования, но чтобы продвинуться дальше, нам нужно знать основные элементы этого фреймворка, к примеру, что подразумевается под потоком данных и как его создать, что такое подписчик и как его использовать и так далее.

Читать еще:  OpenNews: Релиз web-браузера Chrome 72

RxJava в качестве своей основы использует паттерн Observer. Как выглядит этот паттерн в классическом виде? Основными сущностями в нем являются объект, значение которого может изменяться, и подписчик, который может подписаться на эти изменения (каждый раз при изменении значения объекта у подписчика будет вызван определенный метод). Схематично это можно представить следующим образом:

Суть RxJava почти такая же, только вместо одного объекта подписчики используют целый поток данных. Подписчик может подписаться на поток данных, и тогда он будет получать информацию о каждом новом элементе в потоке, о произошедших ошибках, а также о завершении потока.

Тогда схема для RxJava будет такой:

С подписчиком все более менее понятно, а что же такое поток данных? Поток данных – это всего лишь набор каких-то элементов (необязательно конечный), которые передаются подписчику. В качестве потока данных могут выступать как простые объекты и последовательности, так и бесконечные, последовательности и различные события, к примеру, ввод данных.

Рассмотрим один из примеров, который поможет понять, где именно встречаются потоки данных и как в работе с ними может помочь RxJava. Допустим, стоит задача искать людей с определенным именем, когда пользователь вводит текст в поле поиска. Ее можно решить следующими простыми строчками кода:

Разница между потоками Java 8 и наблюдаемыми RxJava

являются ли потоки Java 8 похожими на наблюдаемые RxJava?

определение потока Java 8:

классы в новом java.util.stream пакет предоставляет API потока для поддержка операций функционального стиля в потоках элементов.

7 ответов

TL; DR: все библиотеки обработки последовательности / потока предлагают очень похожий API для строительства трубопровода. Различия заключаются в API для обработки многопоточности и состава трубопроводов.

RxJava сильно отличается от Stream. Из всех вещей JDK, ближе всего к rx.Наблюдаемые возможно java.утиль.поток.Коллекционер!–24–> Stream + CompletableFuture combo (который поставляется за счет работы с дополнительным слоем монады, i. e. необходимость обработки преобразования между Stream > и CompletableFuture > ).

существуют значительные различия между Observable и Stream:

  • потоки основаны на вытягивании, наблюдаемые-на толчке. Это может показаться слишком абстрактным, но это имеет существенные последствия, которые очень конкретны.
  • поток может использоваться только один раз, наблюдаемый может быть подписан на много раз
  • Stream#parallel() разбивает последовательность на разделы Observable#subscribeOn() и Observable#observeOn() нет, это сложно подражать Stream#parallel() поведение с наблюдаемым, когда-то было .parallel() метод, но этот метод вызвал столько путаницы, что .parallel() поддержка была перемещена в отдельный репозиторий на github, RxJavaParallel. Более подробная информация в другого ответа.
  • Stream#parallel() не позволяет указать используемый пул потоков, в отличие от большинства методов RxJava, принимающих необязательный планировщик. С все экземпляры stream в JVM используют тот же пул fork-join, добавляя .parallel() может случайно влияют поведение в другом модуле программы
  • потокам не хватает операций, связанных со временем, таких как Observable#interval() , Observable#window() и многие другие; это в основном потому, что потоки основаны на pull
  • потоки предлагают ограниченный набор операций по сравнению с RxJava. Например. В потоках отсутствуют операции отключения ( takeWhile() , takeUntil() ); обходной путь с помощью Stream#anyMatch() ограничено: это терминальная деятельность, поэтому вы не можете использовать ее больше чем раз в поток
  • по состоянию на JDK 8, нет операции Stream#zip, что иногда очень полезно
  • потоки трудно построить самостоятельно, наблюдаемые могут быть построены многими способамиEDIT: как отмечено в комментариях, существуют способы построения потока. Однако, поскольку нет не-терминального короткого замыкания, вы не можете e. г. легко генерировать поток строк в файле (JDK предоставляет файлы#lines и BufferedReader#lines из коробки, хотя и другие подобные сценарии могут быть управляется путем построения потока из Iterator).
  • Observable предлагает средство управления ресурсами ( Observable#using() ); вы можете обернуть поток ввода-вывода или мьютекс с ним и быть уверены, что пользователь не забудет освободить ресурс – он будет автоматически удален по окончании подписки; поток имеет onClose(Runnable) метод, но вы должны вызвать его вручную или через try-with-resources. Е. Г. вы должны иметь в виду, что файлы строки#() должны быть заключенным в try-with-resources блок.
  • наблюдаемые синхронизируются полностью (я на самом деле не проверял, верно ли то же самое для потоков). Это избавляет вас от мысли, являются ли основные операции потокобезопасными (ответ всегда “да”, если нет ошибки), но накладные расходы, связанные с параллелизмом, будут там, независимо от того, нужен ли ваш код или нет.

Round-up:RxJava значительно отличается от потоков. Реальными альтернативами RxJava являются другие реализации ReactiveStreams, электронной. г. соответствующей части Акко.

Читать еще:  Vmware vsphere client что это за программа?

обновление. Есть трюк, чтобы использовать пул fork-join не по умолчанию для Stream#parallel см. пользовательский пул потоков в Java 8 parallel stream

обновление. Все вышесказанное основано на опыте с RxJava 1.х. Теперь что RxJava 2.x здесь этот ответ может быть устаревшей.

Java 8 Stream и RxJava выглядят довольно похожими. Они имеют похожие операторы (filter, map, flatMap. ) но не построены для такого же использования.

вы можете выполнять задачи asynchonus с помощью RxJava.

с потоком Java 8 вы будете пересекать элементы своей коллекции.

вы можете сделать почти то же самое в RxJava (элементы траверса коллекции), но, поскольку RxJava сосредоточена на параллельной задаче. он использует синхронизацию, защелку . Так та же задача использование RxJava может быть медленнее, чем с потоком Java 8.

RxJava можно сравнить с CompletableFuture , но это может быть в состоянии вычислить более одного значения.

есть несколько технических и концептуальных различий, например, потоки Java 8-это одноразовые, основанные на вытягивании, синхронные последовательности значений, тогда как наблюдаемые RxJava-повторно наблюдаемые, адаптивно основанные на выталкивании, потенциально асинхронные последовательности значений. RxJava нацелен на Java 6+ и работает на Android, а также.

потоки Java 8 основаны на вытягивании. Вы перебираете поток Java 8, потребляющий каждый элемент. И это может быть бесконечный поток.

RXJava Observable по умолчанию push-based. Вы подписываетесь на Observable, и вы получите уведомление, когда следующий пункт прибывает ( onNext ), или когда поток завершен ( onCompleted ), или когда произошла ошибка ( onError ). Потому что с Observable появляется onNext , onCompleted , onError событий, вы можете сделать некоторые мощные функции, такие как объединение разные Observable s для нового ( zip , merge , concat ). Другие вещи, которые вы можете сделать, это кэширование, дросселирование . И он использует более или менее один и тот же API на разных языках (RxJava, RX в C#, RxJS, . )

по умолчанию RxJava однопоточный. Если вы не начнете использовать планировщики, все будет происходить в одном потоке.

существующие ответы являются всеобъемлющими и правильными, но четкий пример для начинающих отсутствует. Позвольте мне поставить некоторые конкретные термины, такие как” толчок/тяга “и”повторно наблюдаемый”. Примечание: ненавижу термин Observable (это поток ради Бога), поэтому просто будет ссылаться на потоки J8 vs RX.

рассмотрим список, если целые,

поток J8-это утилита для изменения коллекции. Например, даже цифры могут быть извлечены as,

это в основном Python в карта, фильтр, уменьшить, очень хорошее (и давно просроченное) дополнение к Java. Но что, если цифры не были сохранены заранее? Можно ли фильтровать четные цифры?

представьте, что отдельный процесс потока выводит целые числа случайным образом ( — обозначает время)

в RX, even can реагировать к каждой новой цифре и примените фильтр в в реальном времени

нет необходимости хранить входные и выходные списки. Если ты . –23–>хочу список вывода, нет проблем, которые также можно передавать. На самом деле,все это поток.

вот почему такие термины, как” без гражданства “и” функциональный”, больше связаны с RX

RxJava также тесно связан с инициатива реактивных потоков и рассматривает его как простую реализацию API реактивных потоков (например, по сравнению с реализация потоков Akka). Основное отличие состоит в том, что реактивные потоки предназначены для обработки обратного давления, но если вы посмотрите на страницу реактивных потоков, вы получите идею. Они довольно хорошо описывают свои цели, и потоки также тесно связаны с реактивный манифест.

потоки Java 8 в значительной степени являются реализацией неограниченной коллекции, довольно похожей на Поток Scala или Clojure lazy seq.

потоки Java 8 позволяют эффективно обрабатывать действительно большие коллекции, используя многоядерные архитектуры. Напротив, RxJava по умолчанию однопоточен (без планировщиков). Поэтому RxJava не будет использовать многоядерные машины, если вы сами не закодируете эту логику.

Nurlandroid

Nurlan’s personal website

Show menu Hide menu

My name is Nurlan Nabiyev. I am passionate Android developer. This is my personal website with my portfolio of apps, posts and etc.

Contacts

Phone: +7 705 575 4828

Вопросы по RxJava на собеседованиях

Ответы на самые частые вопросы по RxJava

1.Какому «паттерну поведения» следует RxJava? Push или Pull?

В RxJava новые данные «заталкиваются»(Push) к наблюдателям

2.Какова разница между колбэками onNext(), onComplete() and onError()?

Эти колбэки которые получают Observable / Flowable. onNext() отрабатывает для каждой эмиссии данных. onComplete() и onError() взаимоисключаемы. Первый отработает когда эмиссия данных завершилась, а второй если произошла ошибка.

3.Сколько раз колбэки onNext(), onComplete() and onError() могут быть вызваны?

Читать еще:  Видео лучших докладов Java-конференции JPoint 2015 — Часть 1

onNext() – от 0 до бесконечного кол-ва раз
onComplete() – максимум один раз за поток
onError() – максимум один раз за поток

4.Когда Observable начинает эмиссию частей данных?

Есть 2 вида Observable — «холодные» и «горячие». Холодные начинают эмиссию данных только тогда когда на них кто-нибудь подпишется. Горячие же эмитят данные вне зависимости от того есть ли подписант или нет.

5. Какая разница между «холодными» и «горячими» Observables

В том что они эмитят данные по разному. Холодные создаются множество раз и каждого инстанса могут подключены разные слушатели(подписанты), с разной логикой. Горячие же похоже на «поток» событий — разные слушатели могут подписываться на горячий observable, но такой observable создается один раз.

6. Можно ли трансформировать «холодный» Observable в «горячий»?

Можно используя операторы publish().connect() . publish() трансформирует обычный Observable в ConnectableObservable, который имеет повадки «горячего». Сразу после того как будет задействован оператор connect() трансформированный Observable начнет эмитить данные вне зависимости от того есть подписчики или нет.

Другой метод трансформации это обернуть Observable Subject-ом. В данном случае Subject подписывается на «холодный» Observable незамедлительно и раскрывает себя как Observable для будущих подписчиков. Опять же, работа выполняется независимо от того, есть ли какие-либо подписчики или нет, с другой стороны, несколько подписчиков Subject-а не будут инициировать начальную эмиссию данных несколько раз.

7. Можно ли трансформировать «горячий» observable в «холодный»?

Можно несколькими путями. Первый подход — использование оператора defer(). Этот оператор откладывает создание «горячего» Observable, поэтому каждый новый подписчик снова запускает работу.

8. Что такое Планировщик (Scheduler)? Как RxJava использует их?

По умолчанию RxJava использует один поток — все операции выполняются на одном потоке. Планировщик помогает переключить выполнение определенного блока кода в иной поток.

9. Что такое цепочка-Observable?

Список операций / преобразований, выполненных между источником и конечным подписчиком. Простой пример — создание объекта User, фильтрация пользователей-администраторов оператором filter(), проверка их подлинности оператором filter() и, наконец, полное имя
оператором map().

10. Какая разница между операторами observeOn() и subscribeOn()?

subscribeOn() — используется для того, чтобы указать планировщику на каком потоке будет проходить основная работа. Например, очень тяжелые вычисления, которые могут затормозить UI-поток, можно перенести в рабочий поток используя данный оператор. Пример: subscribeOn(Schedulers.newThread())

observeOn() — указывает планировщику поток, на котором будут выполняться все последующие операции. Другими словами, он меняет поток для всех операторов после него. Например, после тяжелых вычислений на рабочем потоке, нужно показать результат на UI-потоке. Здесь мы можем написать observeOn(AndroidSchedulers.mainThread())

11. Что будет если много раз выполнить оператор subscribeOn() в цепочке?

Только первый оператор даст желаемый эффект. Остальные же эффекта не дадут, кроме траты ресурсов.

12. Что будет если много раз выполнить оператор observeOn() в цепочке?

Каждый observeOn() включает планировщик (поток), в котором будут выполняться все последующие операторы. Сложные потоки RxJava могут выиграть от нескольких операторов observeOn().

13. Какая разница между операторами map() и flatMap()?

Оператор map() просто превращает(мэппит) ЗначениеА в ЗначениеБ. Например: объекты в списке типа Int превратить в объект типа String Обрабатываются уже готовые/полученные значения.

Оператор flatMap() позволяет превращать-оборачивать получаемые значения в новые потоки данных. Например, если получаем из одного потока строку(типа URL) СтрокаБ, то эту строку мы можем превратить в новый поток. Если Поток испускает несколько элементов, все они в конечном итоге будут переданы исходному наблюдателю (будут «сведены» к одному Observer-у).
Поскольку нет никаких ограничений на Поток , flatMap() полезен для развертывания параллелизма в выполнении задач.

14. Какая разница между операторами flatMap(), concatMap() и switchMap()?

Оператор flatMap() — как мы говорили выше — может разделить цепочку выполнения на несколько промежуточных потоков, а результаты будут получены Observer-ом. Следует отметить, что порядок получения результатов будет в соответствии с тем какой результат был получен первым.

Оператор concatMap() работает также как flatMap(), но только сохраняет порядок выполнения потоков. Из-за этого выполнение этого оператора может занять больше времени.

Оператор switchMap() тоже чем-то похож на flatMap(), но с единственным исключением — при получении нового элемента из цепочки предыдущие потоки созданные из предыдущих элементов уничтожаются. Проще говоря, используя данный оператор активным будет только последний Observable из последнего полученного элемента. Стало быть результат выполнения получим только самого последнего Observable.

голоса
Рейтинг статьи
Ссылка на основную публикацию
Статьи c упоминанием слов: